source: fedd/fedd_experiment_control.py @ c922f23

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since c922f23 was f8582c9, checked in by Ted Faber <faber@…>, 16 years ago

Resource allocation and deallocation really working
Access handler selects allocation ID
Fedid allocation IDs work
Revamp of util code for maodifying messages (e.g. binaries)
Handlers now see fedids as objects in messages
Fedid bug in handlers in fedd_util

This should have been multiple commits

  • Property mode set to 100644
File size: 55.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29import parse_detail
30from service_error import *
31
32import logging
33
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.experiment_control")
38fl.addHandler(nullHandler())
39
40class fedd_experiment_control_local:
41    """
42    Control of experiments that this system can directly access.
43
44    Includes experiment creation, termination and information dissemination.
45    Thred safe.
46    """
47   
48    class thread_pool:
49        """
50        A class to keep track of a set of threads all invoked for the same
51        task.  Manages the mutual exclusion of the states.
52        """
53        def __init__(self):
54            """
55            Start a pool.
56            """
57            self.changed = Condition()
58            self.started = 0
59            self.terminated = 0
60
61        def acquire(self):
62            """
63            Get the pool's lock.
64            """
65            self.changed.acquire()
66
67        def release(self):
68            """
69            Release the pool's lock.
70            """
71            self.changed.release()
72
73        def wait(self, timeout = None):
74            """
75            Wait for a pool thread to start or stop.
76            """
77            self.changed.wait(timeout)
78
79        def start(self):
80            """
81            Called by a pool thread to report starting.
82            """
83            self.changed.acquire()
84            self.started += 1
85            self.changed.notifyAll()
86            self.changed.release()
87
88        def terminate(self):
89            """
90            Called by a pool thread to report finishing.
91            """
92            self.changed.acquire()
93            self.terminated += 1
94            self.changed.notifyAll()
95            self.changed.release()
96
97        def clear(self):
98            """
99            Clear all pool data.
100            """
101            self.changed.acquire()
102            self.started = 0
103            self.terminated =0
104            self.changed.notifyAll()
105            self.changed.release()
106
107    class pooled_thread(Thread):
108        """
109        One of a set of threads dedicated to a specific task.  Uses the
110        thread_pool class above for coordination.
111        """
112        def __init__(self, group=None, target=None, name=None, args=(), 
113                kwargs={}, pdata=None, trace_file=None):
114            Thread.__init__(self, group, target, name, args, kwargs)
115            self.rv = None          # Return value of the ops in this thread
116            self.exception = None   # Exception that terminated this thread
117            self.target=target      # Target function to run on start()
118            self.args = args        # Args to pass to target
119            self.kwargs = kwargs    # Additional kw args
120            self.pdata = pdata      # thread_pool for this class
121            # Logger for this thread
122            self.log = logging.getLogger("fedd.experiment_control")
123       
124        def run(self):
125            """
126            Emulate Thread.run, except add pool data manipulation and error
127            logging.
128            """
129            if self.pdata:
130                self.pdata.start()
131
132            if self.target:
133                try:
134                    self.rv = self.target(*self.args, **self.kwargs)
135                except service_error, s:
136                    self.exception = s
137                    self.log.error("Thread exception: %s %s" % \
138                            (s.code_string(), s.desc))
139                except:
140                    self.exception = sys.exc_info()[1]
141                    self.log.error(("Unexpected thread exception: %s" +\
142                            "Trace %s") % (self.exception,\
143                                traceback.format_exc()))
144            if self.pdata:
145                self.pdata.terminate()
146
147    def __init__(self, config=None):
148        """
149        Intialize the various attributes, most from the config object
150        """
151        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
152        self.thread_pool = fedd_experiment_control_local.thread_pool
153
154        self.cert_file = None
155        self.cert_pwd = None
156        self.trusted_certs = None
157
158        # Walk through the various relevant certificat specifying config
159        # attributes until the local certificate attributes can be resolved.
160        # The walk is from most specific to most general specification.
161        for s in ("experiment_control", "globals"):
162            if config.has_section(s): 
163                if config.has_option(s, "cert_file"):
164                    if not self.cert_file:
165                        self.cert_file = config.get(s, "cert_file")
166                        self.cert_pwd = config.get(s, "cert_pwd")
167
168                if config.has_option(s, "trusted_certs"):
169                    if not self.trusted_certs:
170                        self.trusted_certs = config.get(s, "trusted_certs")
171
172        self.exp_stem = "fed-stem"
173        self.log = logging.getLogger("fedd.experiment_control")
174        self.muxmax = 2
175        self.nthreads = 2
176        self.randomize_experiments = False
177
178        self.scp_exec = "/usr/bin/scp"
179        self.splitter = None
180        self.ssh_exec="/usr/bin/ssh"
181        self.ssh_keygen = "/usr/bin/ssh-keygen"
182        self.ssh_identity_file = None
183
184        if config.has_section("experiment_control"):
185            self.debug = config.get("experiment_control", "create_debug")
186            self.state_filename = config.get("experiment_control", 
187                    "experiment_state_file")
188            self.splitter_url = config.get("experiment_control", "splitter_url")
189            self.fedkit = config.get("experiment_control", "fedkit")
190        else:
191            self.debug = False
192            self.state_filename = None
193            self.splitter_url = None
194            self.fedkit = None
195
196        # XXX
197        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
198        self.ssh_type = "rsa"
199        self.state = { }
200        self.state_lock = Lock()
201        self.tclsh = "/usr/local/bin/otclsh"
202        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
203        self.tbmap = { 
204                'deter':'https://users.isi.deterlab.net:23235',
205                'emulab':'https://users.isi.deterlab.net:23236',
206                'ucb':'https://users.isi.deterlab.net:23237',
207                }
208        self.trace_file = sys.stderr
209
210        self.def_expstart = \
211                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
212                "/tmp/federate";
213        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
214                "FEDDIR/hosts";
215        self.def_gwstart = \
216                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
217                "/tmp/bridge.log";
218        self.def_mgwstart = \
219                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
220                "/tmp/bridge.log";
221        self.def_gwimage = "FBSD61-TUNNEL2";
222        self.def_gwtype = "pc";
223
224
225        if self.ssh_pubkey_file:
226            try:
227                f = open(self.ssh_pubkey_file, 'r')
228                self.ssh_pubkey = f.read()
229                f.close()
230            except IOError:
231                raise service_error(service_error.internal,
232                        "Cannot read sshpubkey")
233
234        set_log_level(config, "experiment_control", self.log)
235
236        # Grab saved state.  OK to do this w/o locking because it's read only
237        # and only one thread should be in existence that can see self.state at
238        # this point.
239        if self.state_filename:
240            self.read_state()
241
242        # Dispatch tables
243        self.soap_services = {\
244                'Create': make_soap_handler(\
245                        CreateRequestMessage.typecode,
246                        getattr(self, "create_experiment"), 
247                        CreateResponseMessage,
248                        "CreateResponseBody"),
249                'Vtopo': make_soap_handler(\
250                        VtopoRequestMessage.typecode,
251                        getattr(self, "get_vtopo"),
252                        VtopoResponseMessage,
253                        "VtopoResponseBody"),
254                'Vis': make_soap_handler(\
255                        VisRequestMessage.typecode,
256                        getattr(self, "get_vis"),
257                        VisResponseMessage,
258                        "VisResponseBody"),
259                'Info': make_soap_handler(\
260                        InfoRequestMessage.typecode,
261                        getattr(self, "get_info"),
262                        InfoResponseMessage,
263                        "InfoResponseBody"),
264                'Terminate': make_soap_handler(\
265                        TerminateRequestMessage.typecode,
266                        getattr(self, "terminate_experiment"),
267                        TerminateResponseMessage,
268                        "TerminateResponseBody"),
269        }
270
271        self.xmlrpc_services = {\
272                'Create': make_xmlrpc_handler(\
273                        getattr(self, "create_experiment"), 
274                        "CreateResponseBody"),
275                'Vtopo': make_xmlrpc_handler(\
276                        getattr(self, "get_vtopo"),
277                        "VtopoResponseBody"),
278                'Vis': make_xmlrpc_handler(\
279                        getattr(self, "get_vis"),
280                        "VisResponseBody"),
281                'Info': make_xmlrpc_handler(\
282                        getattr(self, "get_info"),
283                        "InfoResponseBody"),
284                'Terminate': make_xmlrpc_handler(\
285                        getattr(self, "terminate_experiment"),
286                        "TerminateResponseBody"),
287        }
288
289    def copy_file(self, src, dest, size=1024):
290        """
291        Exceedingly simple file copy.
292        """
293        s = open(src,'r')
294        d = open(dest, 'w')
295
296        buf = "x"
297        while buf != "":
298            buf = s.read(size)
299            d.write(buf)
300        s.close()
301        d.close()
302
303    # Call while holding self.state_lock
304    def write_state(self):
305        """
306        Write a new copy of experiment state after copying the existing state
307        to a backup.
308
309        State format is a simple pickling of the state dictionary.
310        """
311        if os.access(self.state_filename, os.W_OK):
312            self.copy_file(self.state_filename, \
313                    "%s.bak" % self.state_filename)
314        try:
315            f = open(self.state_filename, 'w')
316            pickle.dump(self.state, f)
317        except IOError, e:
318            self.log.error("Can't write file %s: %s" % \
319                    (self.state_filename, e))
320        except pickle.PicklingError, e:
321            self.log.error("Pickling problem: %s" % e)
322        except TypeError, e:
323            self.log.error("Pickling problem (TypeError): %s" % e)
324
325    # Call while holding self.state_lock
326    def read_state(self):
327        """
328        Read a new copy of experiment state.  Old state is overwritten.
329
330        State format is a simple pickling of the state dictionary.
331        """
332        try:
333            f = open(self.state_filename, "r")
334            self.state = pickle.load(f)
335            self.log.debug("[read_state]: Read state from %s" % \
336                    self.state_filename)
337        except IOError, e:
338            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
339                    % (self.state_filename, e))
340        except pickle.UnpicklingError, e:
341            self.log.warning(("[read_state]: No saved state: " + \
342                    "Unpickling failed: %s") % e)
343
344    def scp_file(self, file, user, host, dest=""):
345        """
346        scp a file to the remote host.  If debug is set the action is only
347        logged.
348        """
349
350        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
351        rv = 0
352
353        try:
354            dnull = open("/dev/null", "r")
355        except IOError:
356            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
357            dnull = Null
358
359        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
360        if not self.debug:
361            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
362            else: rv = call(scp_cmd)
363
364        return rv == 0
365
366    def ssh_cmd(self, user, host, cmd, wname=None):
367        """
368        Run a remote command on host as user.  If debug is set, the action is
369        only logged.
370        """
371        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
372
373        try:
374            dnull = open("/dev/null", "r")
375        except IOError:
376            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
377            dnull = Null
378
379        self.log.debug("[ssh_cmd]: %s" % sh_str)
380        if not self.debug:
381            if dnull:
382                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
383            else:
384                sub = Popen(sh_str, shell=True)
385            return sub.wait() == 0
386        else:
387            return True
388
389    def ship_configs(self, host, user, src_dir, dest_dir):
390        """
391        Copy federant-specific configuration files to the federant.
392        """
393        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
394            return False
395        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
396            return False
397
398        for f in os.listdir(src_dir):
399            if os.path.isdir(f):
400                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
401                        "%s/%s" % (dest_dir, f)):
402                    return False
403            else:
404                if not self.scp_file("%s/%s" % (src_dir, f), 
405                        user, host, dest_dir):
406                    return False
407        return True
408
409    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
410        """
411        Start a sub-experiment on a federant.
412
413        Get the current state, modify or create as appropriate, ship data and
414        configs and start the experiment.  There are small ordering differences
415        based on the initial state of the sub-experiment.
416        """
417        # ops node in the federant
418        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
419        user = tbparams[tb]['user']     # federant user
420        pid = tbparams[tb]['project']   # federant project
421        # XXX
422        base_confs = ( "hosts",)
423        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
424        # command to test experiment state
425        expinfo_exec = "/usr/testbed/bin/expinfo" 
426        # Configuration directories on the remote machine
427        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
428        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
429        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
430        # Regular expressions to parse the expinfo response
431        state_re = re.compile("State:\s+(\w+)")
432        no_exp_re = re.compile("^No\s+such\s+experiment")
433        state = None    # Experiment state parsed from expinfo
434        # The expinfo ssh command
435        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
436
437        # Get status
438        self.log.debug("[start_segment]: %s"% " ".join(cmd))
439        dev_null = None
440        try:
441            dev_null = open("/dev/null", "a")
442        except IOError, e:
443            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
444
445        if self.debug:
446            state = 'swapped'
447            rv = 0
448        else:
449            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
450            for line in status.stdout:
451                m = state_re.match(line)
452                if m: state = m.group(1)
453                else:
454                    m = no_exp_re.match(line)
455                    if m: state = "none"
456            rv = status.wait()
457
458        # If the experiment is not present the subcommand returns a non-zero
459        # return value.  If we successfully parsed a "none" outcome, ignore the
460        # return code.
461        if rv != 0 and state != "none":
462            raise service_error(service_error.internal,
463                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
464
465        self.log.debug("[start_segment]: %s: %s" % (tb, state))
466        self.log.info("[start_segment]:transferring experiment to %s" % tb)
467
468        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
469            return False
470        # Clear the federation files
471        if not self.ssh_cmd(user, host, 
472                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
473            return False
474        if not self.ssh_cmd(user, host, 
475                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
476            return False
477        # Clear and create the tarfiles and rpm directories
478        for d in (tarfiles_dir, rpms_dir):
479            if not self.ssh_cmd(user, host, 
480                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
481                return False
482            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
483                    "create tarfiles"):
484                return False
485       
486        if state == 'active':
487            # Remote experiment is active.  Modify it.
488            for f in base_confs:
489                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
490                        "%s/%s" % (proj_dir, f)):
491                    return False
492            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
493                    proj_dir):
494                return False
495            if os.path.isdir("%s/tarfiles" % tmpdir):
496                if not self.ship_configs(host, user,
497                        "%s/tarfiles" % tmpdir, tarfiles_dir):
498                    return False
499            if os.path.isdir("%s/rpms" % tmpdir):
500                if not self.ship_configs(host, user,
501                        "%s/rpms" % tmpdir, tarfiles_dir):
502                    return False
503            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
504            if not self.ssh_cmd(user, host,
505                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
506                            (pid, eid, tclfile), "modexp"):
507                return False
508            return True
509        elif state == "swapped":
510            # Remote experiment swapped out.  Modify it and swap it in.
511            for f in base_confs:
512                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
513                        "%s/%s" % (proj_dir, f)):
514                    return False
515            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
516                    proj_dir):
517                return False
518            if os.path.isdir("%s/tarfiles" % tmpdir):
519                if not self.ship_configs(host, user,
520                        "%s/tarfiles" % tmpdir, tarfiles_dir):
521                    return False
522            if os.path.isdir("%s/rpms" % tmpdir):
523                if not self.ship_configs(host, user,
524                        "%s/rpms" % tmpdir, tarfiles_dir):
525                    return False
526            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
527            if not self.ssh_cmd(user, host,
528                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
529                    "modexp"):
530                return False
531            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
532            if not self.ssh_cmd(user, host,
533                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
534                    "swapexp"):
535                return False
536            return True
537        elif state == "none":
538            # No remote experiment.  Create one.  We do this in 2 steps so we
539            # can put the configuration files and scripts into the new
540            # experiment directories.
541
542            # Tarfiles must be present for creation to work
543            if os.path.isdir("%s/tarfiles" % tmpdir):
544                if not self.ship_configs(host, user,
545                        "%s/tarfiles" % tmpdir, tarfiles_dir):
546                    return False
547            if os.path.isdir("%s/rpms" % tmpdir):
548                if not self.ship_configs(host, user,
549                        "%s/rpms" % tmpdir, tarfiles_dir):
550                    return False
551            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
552            if not self.ssh_cmd(user, host,
553                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
554                            (pid, eid, tclfile), "startexp"):
555                return False
556            # After startexp the per-experiment directories exist
557            for f in base_confs:
558                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
559                        "%s/%s" % (proj_dir, f)):
560                    return False
561            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
562                    proj_dir):
563                return False
564            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
565            if not self.ssh_cmd(user, host,
566                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
567                    "swapexp"):
568                return False
569            return True
570        else:
571            self.log.debug("[start_segment]:unknown state %s" % state)
572            return False
573
574    def stop_segment(self, tb, eid, tbparams):
575        """
576        Stop a sub experiment by calling swapexp on the federant
577        """
578        user = tbparams[tb]['user']
579        host = tbparams[tb]['host']
580        pid = tbparams[tb]['project']
581
582        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
583        return self.ssh_cmd(user, host,
584                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
585
586       
587    def generate_ssh_keys(self, dest, type="rsa" ):
588        """
589        Generate a set of keys for the gateways to use to talk.
590
591        Keys are of type type and are stored in the required dest file.
592        """
593        valid_types = ("rsa", "dsa")
594        t = type.lower();
595        if t not in valid_types: raise ValueError
596        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
597
598        try:
599            trace = open("/dev/null", "w")
600        except IOError:
601            raise service_error(service_error.internal,
602                    "Cannot open /dev/null??");
603
604        # May raise CalledProcessError
605        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
606        rv = call(cmd, stdout=trace, stderr=trace)
607        if rv != 0:
608            raise service_error(service_error.internal, 
609                    "Cannot generate nonce ssh keys.  %s return code %d" \
610                            % (self.ssh_keygen, rv))
611
612    def gentopo(self, str):
613        """
614        Generate the topology dtat structure from the splitter's XML
615        representation of it.
616
617        The topology XML looks like:
618            <experiment>
619                <nodes>
620                    <node><vname></vname><ips>ip1:ip2</ips></node>
621                </nodes>
622                <lans>
623                    <lan>
624                        <vname></vname><vnode></vnode><ip></ip>
625                        <bandwidth></bandwidth><member>node:port</member>
626                    </lan>
627                </lans>
628        """
629        class topo_parse:
630            """
631            Parse the topology XML and create the dats structure.
632            """
633            def __init__(self):
634                # Typing of the subelements for data conversion
635                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
636                self.int_subelements = ( 'bandwidth',)
637                self.float_subelements = ( 'delay',)
638                # The final data structure
639                self.nodes = [ ]
640                self.lans =  [ ]
641                self.topo = { \
642                        'node': self.nodes,\
643                        'lan' : self.lans,\
644                    }
645                self.element = { }  # Current element being created
646                self.chars = ""     # Last text seen
647
648            def end_element(self, name):
649                # After each sub element the contents is added to the current
650                # element or to the appropriate list.
651                if name == 'node':
652                    self.nodes.append(self.element)
653                    self.element = { }
654                elif name == 'lan':
655                    self.lans.append(self.element)
656                    self.element = { }
657                elif name in self.str_subelements:
658                    self.element[name] = self.chars
659                    self.chars = ""
660                elif name in self.int_subelements:
661                    self.element[name] = int(self.chars)
662                    self.chars = ""
663                elif name in self.float_subelements:
664                    self.element[name] = float(self.chars)
665                    self.chars = ""
666
667            def found_chars(self, data):
668                self.chars += data.rstrip()
669
670
671        tp = topo_parse();
672        parser = xml.parsers.expat.ParserCreate()
673        parser.EndElementHandler = tp.end_element
674        parser.CharacterDataHandler = tp.found_chars
675
676        parser.Parse(str)
677
678        return tp.topo
679       
680
681    def genviz(self, topo):
682        """
683        Generate the visualization the virtual topology
684        """
685
686        neato = "/usr/local/bin/neato"
687        # These are used to parse neato output and to create the visualization
688        # file.
689        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
690        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
691                "%s</type></node>"
692
693        try:
694            # Node names
695            nodes = [ n['vname'] for n in topo['node'] ]
696            topo_lans = topo['lan']
697        except KeyError:
698            raise service_error(service_error.internal, "Bad topology")
699
700        lans = { }
701        links = { }
702
703        # Walk through the virtual topology, organizing the connections into
704        # 2-node connections (links) and more-than-2-node connections (lans).
705        # When a lan is created, it's added to the list of nodes (there's a
706        # node in the visualization for the lan).
707        for l in topo_lans:
708            if links.has_key(l['vname']):
709                if len(links[l['vname']]) < 2:
710                    links[l['vname']].append(l['vnode'])
711                else:
712                    nodes.append(l['vname'])
713                    lans[l['vname']] = links[l['vname']]
714                    del links[l['vname']]
715                    lans[l['vname']].append(l['vnode'])
716            elif lans.has_key(l['vname']):
717                lans[l['vname']].append(l['vnode'])
718            else:
719                links[l['vname']] = [ l['vnode'] ]
720
721
722        # Open up a temporary file for dot to turn into a visualization
723        try:
724            df, dotname = tempfile.mkstemp()
725            dotfile = os.fdopen(df, 'w')
726        except IOError:
727            raise service_error(service_error.internal,
728                    "Failed to open file in genviz")
729
730        # Generate a dot/neato input file from the links, nodes and lans
731        try:
732            print >>dotfile, "graph G {"
733            for n in nodes:
734                print >>dotfile, '\t"%s"' % n
735            for l in links.keys():
736                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
737            for l in lans.keys():
738                for n in lans[l]:
739                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
740            print >>dotfile, "}"
741            dotfile.close()
742        except TypeError:
743            raise service_error(service_error.internal,
744                    "Single endpoint link in vtopo")
745        except IOError:
746            raise service_error(service_error.internal, "Cannot write dot file")
747
748        # Use dot to create a visualization
749        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
750                '-Gpack=true', dotname], stdout=PIPE)
751
752        # Translate dot to vis format
753        vis_nodes = [ ]
754        vis = { 'node': vis_nodes }
755        for line in dot.stdout:
756            m = vis_re.match(line)
757            if m:
758                vn = m.group(1)
759                vis_node = {'name': vn, \
760                        'x': float(m.group(2)),\
761                        'y' : float(m.group(3)),\
762                    }
763                if vn in links.keys() or vn in lans.keys():
764                    vis_node['type'] = 'lan'
765                else:
766                    vis_node['type'] = 'node'
767                vis_nodes.append(vis_node)
768        rv = dot.wait()
769
770        os.remove(dotname)
771        if rv == 0 : return vis
772        else: return None
773
774    def get_access(self, tb, nodes, user, tbparam):
775        """
776        Get access to testbed through fedd and set the parameters for that tb
777        """
778
779        translate_attr = {
780            'slavenodestartcmd': 'expstart',
781            'slaveconnectorstartcmd': 'gwstart',
782            'masternodestartcmd': 'mexpstart',
783            'masterconnectorstartcmd': 'mgwstart',
784            'connectorimage': 'gwimage',
785            'connectortype': 'gwtype',
786            'tunnelcfg': 'tun',
787            'smbshare': 'smbshare',
788        }
789
790        uri = self.tbmap.get(tb, None)
791        if not uri:
792            raise service_error(serice_error.server_config, 
793                    "Unknown testbed: %s" % tb)
794
795        # The basic request
796        req = {\
797                'destinationTestbed' : { 'uri' : uri },
798                'user':  user,
799                'allocID' : { 'localname': 'test' },
800                # XXX: need to get service access stright
801                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
802                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
803            }
804       
805        # node resources if any
806        if nodes != None and len(nodes) > 0:
807            rnodes = [ ]
808            for n in nodes:
809                rn = { }
810                image, hw, count = n.split(":")
811                if image: rn['image'] = [ image ]
812                if hw: rn['hardware'] = [ hw ]
813                if count: rn['count'] = int(count)
814                rnodes.append(rn)
815            req['resources']= { }
816            req['resources']['node'] = rnodes
817
818        # No retry loop here.  Proxy servers must correctly authenticate
819        # themselves without help
820
821        try:
822            ctx = fedd_ssl_context(self.cert_file, 
823                    self.trusted_certs, password=self.cert_pwd)
824        except SSL.SSLError:
825            raise service_error(service_error.server_config, 
826                    "Server certificates misconfigured")
827
828        loc = feddServiceLocator();
829        port = loc.getfeddPortType(uri,
830                transport=M2Crypto.httpslib.HTTPSConnection, 
831                transdict={ 'ssl_context' : ctx })
832
833        # Reconstruct the full request message
834        msg = RequestAccessRequestMessage()
835        msg.set_element_RequestAccessRequestBody(
836                pack_soap(msg, "RequestAccessRequestBody", req))
837
838        try:
839            resp = port.RequestAccess(msg)
840        except ZSI.ParseException, e:
841            raise service_error(service_error.req,
842                    "Bad format message (XMLRPC??): %s" % str(e))
843        except ZSI.FaultException, e:
844            resp = e.fault.detail[0]
845
846        # Again, weird incompatibilities rear their head.  userRoles, which are
847        # restricted strings, seem to be encoded by ZSI as non-unicode strings
848        # in a way that confuses the pickling and XMLRPC sending systems.
849        # Explicitly unicoding them seems to fix this, though it concerns me
850        # some.  It may be that these things are actually a ZSI string
851        # subclass, and the character encoding is not the major issue.  In any
852        # case, making all the subclasses of basestring into unicode strings
853        # unifies the response format and solves the problem.
854        r = make_unicode(fedids_to_obj(unpack_soap(resp)))
855
856        if r.has_key('RequestAccessResponseBody'):
857            r = r['RequestAccessResponseBody']
858        else:
859            raise service_error(service_error.proxy,
860                    "Bad proxy response")
861
862        e = r['emulab']
863        p = e['project']
864        tbparam[tb] = { 
865                "boss": e['boss'],
866                "host": e['ops'],
867                "domain": e['domain'],
868                "fs": e['fileServer'],
869                "eventserver": e['eventServer'],
870                "project": unpack_id(p['name']),
871                "emulab" : e,
872                "allocID" : r['allocID'],
873                }
874        # Make the testbed name be the label the user applied
875        p['testbed'] = {'localname': tb }
876
877        for u in p['user']:
878            tbparam[tb]['user'] = unpack_id(u['userID'])
879
880        for a in e['fedAttr']:
881            if a['attribute']:
882                key = translate_attr.get(a['attribute'].lower(), None)
883                if key:
884                    tbparam[tb][key]= a['value']
885       
886    def release_access(self, tb, aid):
887        """
888        Release access to testbed through fedd
889        """
890
891        uri = self.tbmap.get(tb, None)
892        if not uri:
893            raise service_error(serice_error.server_config, 
894                    "Unknown testbed: %s" % tb)
895
896        # The basic request
897        req = { 'allocID' : aid }
898
899        # No retry loop here.  Proxy servers must correctly authenticate
900        # themselves without help
901        try:
902            ctx = fedd_ssl_context(self.cert_file, 
903                    self.trusted_certs, password=self.cert_pwd)
904        except SSL.SSLError:
905            raise service_error(service_error.server_config, 
906                    "Server certificates misconfigured")
907
908        loc = feddServiceLocator();
909        port = loc.getfeddPortType(uri,
910                transport=M2Crypto.httpslib.HTTPSConnection, 
911                transdict={ 'ssl_context' : ctx })
912
913        # Reconstruct the full request message
914        msg = ReleaseAccessRequestMessage()
915        msg.set_element_ReleaseAccessRequestBody(
916                pack_soap(msg, "ReleaseAccessRequestBody", req))
917
918        try:
919            resp = port.ReleaseAccess(msg)
920        except ZSI.ParseException, e:
921            raise service_error(service_error.req,
922                    "Bad format message (XMLRPC??): %s" % str(e))
923        except ZSI.FaultException, e:
924            resp = e.fault.detail[0]
925
926        # better error coding
927
928    def remote_splitter(self, uri, desc, master):
929
930        req = {
931                'description' : { 'ns2description': desc },
932                'master': master,
933                'include_fedkit': bool(self.fedkit)
934            }
935
936        # No retry loop here.  Proxy servers must correctly authenticate
937        # themselves without help
938        try:
939            ctx = fedd_ssl_context(self.cert_file, 
940                    self.trusted_certs, password=self.cert_pwd)
941        except SSL.SSLError:
942            raise service_error(service_error.server_config, 
943                    "Server certificates misconfigured")
944
945        loc = feddInternalServiceLocator();
946        port = loc.getfeddInternalPortType(uri,
947                transport=M2Crypto.httpslib.HTTPSConnection, 
948                transdict={ 'ssl_context' : ctx })
949
950        # Reconstruct the full request message
951        msg = Ns2SplitRequestMessage()
952        msg.set_element_Ns2SplitRequestBody(
953                pack_soap(msg, "Ns2SplitRequestBody", req))
954
955        try:
956            resp = port.Ns2Split(msg)
957        except ZSI.ParseException, e:
958            raise service_error(service_error.req,
959                    "Bad format message (XMLRPC??): %s" %
960                    str(e))
961        r = unpack_soap(resp)
962        if r.has_key('Ns2SplitResponseBody'):
963            r = r['Ns2SplitResponseBody']
964            if r.has_key('output'):
965                return r['output'].splitlines()
966            else:
967                raise service_error(service_error.proxy, 
968                        "Bad splitter response (no output)")
969        else:
970            raise service_error(service_error.proxy, "Bad splitter response")
971       
972    class current_testbed:
973        """
974        Object for collecting the current testbed description.  The testbed
975        description is saved to a file with the local testbed variables
976        subsittuted line by line.
977        """
978        def __init__(self, eid, tmpdir, fedkit):
979            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
980            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
981            self.current_testbed = None
982            self.testbed_file = None
983
984            self.def_expstart = \
985                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
986            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
987            self.def_gwstart = \
988                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
989            self.def_mgwstart = \
990                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
991            self.def_gwimage = "FBSD61-TUNNEL2";
992            self.def_gwtype = "pc";
993
994            self.eid = eid
995            self.tmpdir = tmpdir
996            self.fedkit = fedkit
997
998        def __call__(self, line, master, allocated, tbparams):
999            # Capture testbed topology descriptions
1000            if self.current_testbed == None:
1001                m = self.begin_testbed.match(line)
1002                if m != None:
1003                    self.current_testbed = m.group(1)
1004                    if self.current_testbed == None:
1005                        raise service_error(service_error.req,
1006                                "Bad request format (unnamed testbed)")
1007                    allocated[self.current_testbed] = \
1008                            allocated.get(self.current_testbed,0) + 1
1009                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1010                    if not os.path.exists(tb_dir):
1011                        try:
1012                            os.mkdir(tb_dir)
1013                        except IOError:
1014                            raise service_error(service_error.internal,
1015                                    "Cannot create %s" % tb_dir)
1016                    try:
1017                        self.testbed_file = open("%s/%s.%s.tcl" %
1018                                (tb_dir, self.eid, self.current_testbed), 'w')
1019                    except IOError:
1020                        self.testbed_file = None
1021                    return True
1022                else: return False
1023            else:
1024                m = self.end_testbed.match(line)
1025                if m != None:
1026                    if m.group(1) != self.current_testbed:
1027                        raise service_error(service_error.internal, 
1028                                "Mismatched testbed markers!?")
1029                    if self.testbed_file != None: 
1030                        self.testbed_file.close()
1031                        self.testbed_file = None
1032                    self.current_testbed = None
1033                elif self.testbed_file:
1034                    # Substitute variables and put the line into the local
1035                    # testbed file.
1036                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1037                            self.def_gwtype)
1038                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1039                            self.def_gwimage)
1040                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1041                            self.def_mgwstart)
1042                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1043                            self.def_mexpstart)
1044                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1045                            self.def_gwstart)
1046                    expstart = tbparams[self.current_testbed].get('expstart', 
1047                            self.def_expstart)
1048                    project = tbparams[self.current_testbed].get('project')
1049                    line = re.sub("GWTYPE", gwtype, line)
1050                    line = re.sub("GWIMAGE", gwimage, line)
1051                    if self.current_testbed == master:
1052                        line = re.sub("GWSTART", mgwstart, line)
1053                        line = re.sub("EXPSTART", mexpstart, line)
1054                    else:
1055                        line = re.sub("GWSTART", gwstart, line)
1056                        line = re.sub("EXPSTART", expstart, line)
1057                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1058                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1059                    line = re.sub("EID", self.eid, line)
1060                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1061                            (project, self.eid), line)
1062                    if self.fedkit:
1063                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1064                                line)
1065                    print >>self.testbed_file, line
1066                return True
1067
1068    class allbeds:
1069        """
1070        Process the Allbeds section.  Get access to each federant and save the
1071        parameters in tbparams
1072        """
1073        def __init__(self, get_access):
1074            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1075            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1076            self.in_allbeds = False
1077            self.get_access = get_access
1078
1079        def __call__(self, line, user, tbparams):
1080            # Testbed access parameters
1081            if not self.in_allbeds:
1082                if self.begin_allbeds.match(line):
1083                    self.in_allbeds = True
1084                    return True
1085                else:
1086                    return False
1087            else:
1088                if self.end_allbeds.match(line):
1089                    self.in_allbeds = False
1090                else:
1091                    nodes = line.split('|')
1092                    tb = nodes.pop(0)
1093                    self.get_access(tb, nodes, user, tbparams)
1094                return True
1095
1096    class gateways:
1097        def __init__(self, eid, master, tmpdir, gw_pubkey,
1098                gw_secretkey, copy_file, fedkit):
1099            self.begin_gateways = \
1100                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1101            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1102            self.current_gateways = None
1103            self.control_gateway = None
1104            self.active_end = { }
1105
1106            self.eid = eid
1107            self.master = master
1108            self.tmpdir = tmpdir
1109            self.gw_pubkey_base = gw_pubkey
1110            self.gw_secretkey_base = gw_secretkey
1111
1112            self.copy_file = copy_file
1113            self.fedkit = fedkit
1114
1115
1116        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1117                active_end, tbparams, dtb, myname, desthost, type):
1118            """
1119            Produce a gateway configuration file from a gateways line.
1120            """
1121
1122            sproject = tbparams[gw].get('project', 'project')
1123            dproject = tbparams[dtb].get('project', 'project')
1124            sdomain = ".%s.%s%s" % (eid, sproject,
1125                    tbparams[gw].get('domain', ".example.com"))
1126            ddomain = ".%s.%s%s" % (eid, dproject,
1127                    tbparams[dtb].get('domain', ".example.com"))
1128            boss = tbparams[master].get('boss', "boss")
1129            fs = tbparams[master].get('fs', "fs")
1130            event_server = "%s%s" % \
1131                    (tbparams[gw].get('eventserver', "event_server"),
1132                            tbparams[gw].get('domain', "example.com"))
1133            remote_event_server = "%s%s" % \
1134                    (tbparams[dtb].get('eventserver', "event_server"),
1135                            tbparams[dtb].get('domain', "example.com"))
1136            seer_control = "%s%s" % \
1137                    (tbparams[gw].get('control', "control"), sdomain)
1138
1139            if self.fedkit:
1140                remote_script_dir = "/usr/local/federation/bin"
1141                local_script_dir = "/usr/local/federation/bin"
1142            else:
1143                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1144                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1145
1146            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1147            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1148            tunnel_cfg = tbparams[gw].get("tun", "false")
1149
1150            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1151            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1152
1153            # translate to lower case so the `hostname` hack for specifying
1154            # configuration files works.
1155            conf_file = conf_file.lower();
1156            remote_conf_file = remote_conf_file.lower();
1157
1158            if dtb == master:
1159                active = "false"
1160            elif gw == master:
1161                active = "true"
1162            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1163                active = "false"
1164            else:
1165                active_end['%s-%s' % (gw, dtb)] = 1
1166                active = "true"
1167
1168            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1169            print >>gwconfig, "Active: %s" % active
1170            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1171            print >>gwconfig, "BossName: %s" % boss
1172            print >>gwconfig, "FsName: %s" % fs
1173            print >>gwconfig, "EventServerName: %s" % event_server
1174            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1175            print >>gwconfig, "SeerControl: %s" % seer_control
1176            print >>gwconfig, "Type: %s" % type
1177            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1178            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1179                    local_script_dir
1180            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1181            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1182            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1183                    (remote_conf_dir, remote_conf_file)
1184            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1185            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1186            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1187            gwconfig.close()
1188
1189            return active == "true"
1190
1191        def __call__(self, line, allocated, tbparams):
1192            # Process gateways
1193            if not self.current_gateways:
1194                m = self.begin_gateways.match(line)
1195                if m:
1196                    self.current_gateways = m.group(1)
1197                    if allocated.has_key(self.current_gateways):
1198                        # This test should always succeed
1199                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1200                        if not os.path.exists(tb_dir):
1201                            try:
1202                                os.mkdir(tb_dir)
1203                            except IOError:
1204                                raise service_error(service_error.internal,
1205                                        "Cannot create %s" % tb_dir)
1206                    else:
1207                        # XXX
1208                        self.log.error("[gateways]: Ignoring gateways for " + \
1209                                "unknown testbed %s" % self.current_gateways)
1210                        self.current_gateways = None
1211                    return True
1212                else:
1213                    return False
1214            else:
1215                m = self.end_gateways.match(line)
1216                if m :
1217                    if m.group(1) != self.current_gateways:
1218                        raise service_error(service_error.internal,
1219                                "Mismatched gateway markers!?")
1220                    if self.control_gateway:
1221                        try:
1222                            cc = open("%s/%s/client.conf" %
1223                                    (self.tmpdir, self.current_gateways), 'w')
1224                            print >>cc, "ControlGateway: %s" % \
1225                                    self.control_gateway
1226                            if tbparams[self.master].has_key('smbshare'):
1227                                print >>cc, "SMBSHare: %s" % \
1228                                        tbparams[self.master]['smbshare']
1229                            print >>cc, "ProjectUser: %s" % \
1230                                    tbparams[self.master]['user']
1231                            print >>cc, "ProjectName: %s" % \
1232                                    tbparams[self.master]['project']
1233                            cc.close()
1234                        except IOError:
1235                            raise service_error(service_error.internal,
1236                                    "Error creating client config")
1237                        try:
1238                            cc = open("%s/%s/seer.conf" %
1239                                    (self.tmpdir, self.current_gateways),
1240                                    'w')
1241                            if self.current_gateways != self.master:
1242                                print >>cc, "ControlNode: %s" % \
1243                                        self.control_gateway
1244                            print >>cc, "ExperimentID: %s/%s" % \
1245                                    ( tbparams[self.master]['project'], \
1246                                    self.eid )
1247                            cc.close()
1248                        except IOError:
1249                            raise service_error(service_error.internal,
1250                                    "Error creating seer config")
1251                    else:
1252                        debug.error("[gateways]: No control gateway for %s" %\
1253                                    self.current_gateways)
1254                    self.current_gateways = None
1255                else:
1256                    dtb, myname, desthost, type = line.split(" ")
1257
1258                    if type == "control" or type == "both":
1259                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1260                                self.eid, 
1261                                tbparams[self.current_gateways]['project'],
1262                                tbparams[self.current_gateways]['domain'])
1263                    try:
1264                        active = self.gateway_conf_file(self.current_gateways,
1265                                self.master, self.eid, self.gw_pubkey_base,
1266                                self.gw_secretkey_base,
1267                                self.active_end, tbparams, dtb, myname,
1268                                desthost, type)
1269                    except IOError, e:
1270                        raise service_error(service_error.internal,
1271                                "Failed to write config file for %s" % \
1272                                        self.current_gateway)
1273           
1274                    gw_pubkey = "%s/keys/%s" % \
1275                            (self.tmpdir, self.gw_pubkey_base)
1276                    gw_secretkey = "%s/keys/%s" % \
1277                            (self.tmpdir, self.gw_secretkey_base)
1278
1279                    pkfile = "%s/%s/%s" % \
1280                            ( self.tmpdir, self.current_gateways, 
1281                                    self.gw_pubkey_base)
1282                    skfile = "%s/%s/%s" % \
1283                            ( self.tmpdir, self.current_gateways, 
1284                                    self.gw_secretkey_base)
1285
1286                    if not os.path.exists(pkfile):
1287                        try:
1288                            self.copy_file(gw_pubkey, pkfile)
1289                        except IOError:
1290                            service_error(service_error.internal,
1291                                    "Failed to copy pubkey file")
1292
1293                    if active and not os.path.exists(skfile):
1294                        try:
1295                            self.copy_file(gw_secretkey, skfile)
1296                        except IOError:
1297                            service_error(service_error.internal,
1298                                    "Failed to copy secretkey file")
1299                return True
1300
1301    class shunt_to_file:
1302        """
1303        Simple class to write data between two regexps to a file.
1304        """
1305        def __init__(self, begin, end, filename):
1306            """
1307            Begin shunting on a match of begin, stop on end, send data to
1308            filename.
1309            """
1310            self.begin = re.compile(begin)
1311            self.end = re.compile(end)
1312            self.in_shunt = False
1313            self.file = None
1314            self.filename = filename
1315
1316        def __call__(self, line):
1317            """
1318            Call this on each line in the input that may be shunted.
1319            """
1320            if not self.in_shunt:
1321                if self.begin.match(line):
1322                    self.in_shunt = True
1323                    try:
1324                        self.file = open(self.filename, "w")
1325                    except:
1326                        self.file = None
1327                        raise
1328                    return True
1329                else:
1330                    return False
1331            else:
1332                if self.end.match(line):
1333                    if self.file: 
1334                        self.file.close()
1335                        self.file = None
1336                    self.in_shunt = False
1337                else:
1338                    if self.file:
1339                        print >>self.file, line
1340                return True
1341
1342    class shunt_to_list:
1343        """
1344        Same interface as shunt_to_file.  Data collected in self.list, one list
1345        element per line.
1346        """
1347        def __init__(self, begin, end):
1348            self.begin = re.compile(begin)
1349            self.end = re.compile(end)
1350            self.in_shunt = False
1351            self.list = [ ]
1352       
1353        def __call__(self, line):
1354            if not self.in_shunt:
1355                if self.begin.match(line):
1356                    self.in_shunt = True
1357                    return True
1358                else:
1359                    return False
1360            else:
1361                if self.end.match(line):
1362                    self.in_shunt = False
1363                else:
1364                    self.list.append(line)
1365                return True
1366
1367    class shunt_to_string:
1368        """
1369        Same interface as shunt_to_file.  Data collected in self.str, all in
1370        one string.
1371        """
1372        def __init__(self, begin, end):
1373            self.begin = re.compile(begin)
1374            self.end = re.compile(end)
1375            self.in_shunt = False
1376            self.str = ""
1377       
1378        def __call__(self, line):
1379            if not self.in_shunt:
1380                if self.begin.match(line):
1381                    self.in_shunt = True
1382                    return True
1383                else:
1384                    return False
1385            else:
1386                if self.end.match(line):
1387                    self.in_shunt = False
1388                else:
1389                    self.str += line
1390                return True
1391
1392    def create_experiment(self, req, fid):
1393        """
1394        The external interface to experiment creation called from the
1395        dispatcher.
1396
1397        Creates a working directory, splits the incoming description using the
1398        splitter script and parses out the avrious subsections using the
1399        lcasses above.  Once each sub-experiment is created, use pooled threads
1400        to instantiate them and start it all up.
1401        """
1402        try:
1403            tmpdir = tempfile.mkdtemp(prefix="split-")
1404        except IOError:
1405            raise service_error(service_error.internal, "Cannot create tmp dir")
1406
1407        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1408        gw_secretkey_base = "fed.%s" % self.ssh_type
1409        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1410        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1411        tclfile = tmpdir + "/experiment.tcl"
1412        tbparams = { }
1413
1414        pid = "dummy"
1415        gid = "dummy"
1416        # XXX
1417        fail_soft = False
1418
1419        try:
1420            os.mkdir(tmpdir+"/keys")
1421        except OSError:
1422            raise service_error(service_error.internal,
1423                    "Can't make temporary dir")
1424
1425        req = req.get('CreateRequestBody', None)
1426        if not req:
1427            raise service_error(service_error.req,
1428                    "Bad request format (no CreateRequestBody)")
1429        # The tcl parser needs to read a file so put the content into that file
1430        descr=req.get('experimentdescription', None)
1431        if descr:
1432            file_content=descr.get('ns2description', None)
1433            if file_content:
1434                try:
1435                    f = open(tclfile, 'w')
1436                    f.write(file_content)
1437                    f.close()
1438                except IOError:
1439                    raise service_error(service_error.internal,
1440                            "Cannot write temp experiment description")
1441            else:
1442                raise service_error(service_error.req, 
1443                        "Only ns2descriptions supported")
1444        else:
1445            raise service_error(service_error.req, "No experiment description")
1446
1447        if req.has_key('experimentID') and \
1448                req['experimentID'].has_key('localname'):
1449            eid = req['experimentID']['localname']
1450            self.state_lock.acquire()
1451            while (self.state.has_key(eid)):
1452                eid += random.choice(string.ascii_letters)
1453            # To avoid another thread picking this localname
1454            self.state[eid] = "placeholder"
1455            self.state_lock.release()
1456        else:
1457            eid = self.exp_stem
1458            for i in range(0,5):
1459                eid += random.choice(string.ascii_letters)
1460            self.state_lock.acquire()
1461            while (self.state.has_key(eid)):
1462                eid = self.exp_stem
1463                for i in range(0,5):
1464                    eid += random.choice(string.ascii_letters)
1465            # To avoid another thread picking this localname
1466            self.state[eid] = "placeholder"
1467            self.state_lock.release()
1468
1469        try:
1470            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1471        except ValueError:
1472            raise service_error(service_error.server_config, 
1473                    "Bad key type (%s)" % self.ssh_type)
1474
1475        user = req.get('user', None)
1476        if user == None:
1477            raise service_error(service_error.req, "No user")
1478
1479        master = req.get('master', None)
1480        if master == None:
1481            raise service_error(service_error.req, "No master testbed label")
1482       
1483        if self.splitter_url:
1484            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1485            split_data = self.remote_splitter(self.splitter_url, file_content,
1486                    master)
1487        else:
1488            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1489                str(self.muxmax), '-m', master]
1490
1491            if self.fedkit:
1492                tclcmd.append('-k')
1493
1494            tclcmd.extend([pid, gid, eid, tclfile])
1495
1496            self.log.debug("running local splitter %s", " ".join(tclcmd))
1497            tclparser = Popen(tclcmd, stdout=PIPE)
1498            split_data = tclparser.stdout
1499
1500        allocated = { }     # Testbeds we can access
1501        started = { }       # Testbeds where a sub-experiment started
1502                            # successfully
1503
1504        # Objects to parse the splitter output (defined above)
1505        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1506        parse_allbeds = self.allbeds(self.get_access)
1507        parse_gateways = self.gateways(eid, master, tmpdir,
1508                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1509        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1510                    "^#\s+End\s+Vtopo")
1511        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1512                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1513        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1514                "^#\s+End\s+tarfiles")
1515        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1516                "^#\s+End\s+rpms")
1517
1518        # Worling on the split data
1519        for line in split_data:
1520            line = line.rstrip()
1521            if parse_current_testbed(line, master, allocated, tbparams):
1522                continue
1523            elif parse_allbeds(line, user, tbparams):
1524                continue
1525            elif parse_gateways(line, allocated, tbparams):
1526                continue
1527            elif parse_vtopo(line):
1528                continue
1529            elif parse_hostnames(line):
1530                continue
1531            elif parse_tarfiles(line):
1532                continue
1533            elif parse_rpms(line):
1534                continue
1535            else:
1536                raise service_error(service_error.internal, 
1537                        "Bad tcl parse? %s" % line)
1538
1539        # Virtual topology and visualization
1540        vtopo = self.gentopo(parse_vtopo.str)
1541        if not vtopo:
1542            raise service_error(service_error.internal, 
1543                    "Failed to generate virtual topology")
1544
1545        vis = self.genviz(vtopo)
1546        if not vis:
1547            raise service_error(service_error.internal, 
1548                    "Failed to generate visualization")
1549
1550        # save federant information
1551        for k in allocated.keys():
1552            tbparams[k]['federant'] = {\
1553                    'name': [ { 'localname' : eid} ],\
1554                    'emulab': tbparams[k]['emulab'],\
1555                    'allocID' : tbparams[k]['allocID'],\
1556                    'master' : k == master,\
1557                }
1558
1559
1560        # Copy tarfiles and rpms needed at remote sites into a staging area
1561        try:
1562            if self.fedkit:
1563                parse_tarfiles.list.append(self.fedkit)
1564            for t in parse_tarfiles.list:
1565                if not os.path.exists("%s/tarfiles" % tmpdir):
1566                    os.mkdir("%s/tarfiles" % tmpdir)
1567                self.copy_file(t, "%s/tarfiles/%s" % \
1568                        (tmpdir, os.path.basename(t)))
1569            for r in parse_rpms.list:
1570                if not os.path.exists("%s/rpms" % tmpdir):
1571                    os.mkdir("%s/rpms" % tmpdir)
1572                self.copy_file(r, "%s/rpms/%s" % \
1573                        (tmpdir, os.path.basename(r)))
1574        except IOError, e:
1575            raise service_error(service_error.internal, 
1576                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1577
1578        thread_pool_info = self.thread_pool()
1579        threads = [ ]
1580
1581        for tb in [ k for k in allocated.keys() if k != master]:
1582            # Wait until we have a free slot to start the next testbed load
1583            thread_pool_info.acquire()
1584            while thread_pool_info.started - \
1585                    thread_pool_info.terminated >= self.nthreads:
1586                thread_pool_info.wait()
1587            thread_pool_info.release()
1588
1589            # Create and start a thread to start the segment, and save it to
1590            # get the return value later
1591            t  = self.pooled_thread(target=self.start_segment, 
1592                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1593                    pdata=thread_pool_info, trace_file=self.trace_file)
1594            threads.append(t)
1595            t.start()
1596
1597        # Wait until all finish (the first clause of the while is to make sure
1598        # one starts)
1599        thread_pool_info.acquire()
1600        while thread_pool_info.started == 0 or \
1601                thread_pool_info.started > thread_pool_info.terminated:
1602            thread_pool_info.wait()
1603        thread_pool_info.release()
1604
1605        # If none failed, start the master
1606        failed = [ t.getName() for t in threads if not t.rv ]
1607
1608        if len(failed) == 0:
1609            if not self.start_segment(master, eid, tbparams, tmpdir):
1610                failed.append(master)
1611
1612        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1613        # If one failed clean up, unless fail_soft is set
1614        if failed:
1615            if not fail_soft:
1616                for tb in succeeded:
1617                    self.stop_segment(tb, eid, tbparams)
1618                # Remove the placeholder
1619                self.state_lock.acquire()
1620                del self.state[eid]
1621                self.state_lock.release()
1622
1623                raise service_error(service_error.federant,
1624                    "Swap in failed on %s" % ",".join(failed))
1625        else:
1626            self.log.info("[start_segment]: Experiment %s started" % eid)
1627
1628        # Generate an ID for the experiment (slice) and a certificate that the
1629        # allocator can use to prove they own it.  We'll ship it back through
1630        # the encrypted connection.
1631        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1632
1633        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1634
1635        # Walk up tmpdir, deleting as we go
1636        for path, dirs, files in os.walk(tmpdir, topdown=False):
1637            for f in files:
1638                os.remove(os.path.join(path, f))
1639            for d in dirs:
1640                os.rmdir(os.path.join(path, d))
1641        os.rmdir(tmpdir)
1642
1643        # The deepcopy prevents the allocation ID and other binaries from being
1644        # translated into other formats
1645        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1646                for tb in tbparams.keys() \
1647                    if tbparams[tb].has_key('federant') ],\
1648                    'vtopo': vtopo,\
1649                    'vis' : vis,
1650                    'experimentID' : [\
1651                            { 'fedid': copy.copy(expid) }, \
1652                            { 'localname': eid },\
1653                        ],\
1654                    'experimentAccess': { 'X509' : expcert },\
1655                }
1656
1657        # Insert the experiment into our state and update the disk copy
1658        self.state_lock.acquire()
1659        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1660                for tb in tbparams.keys() \
1661                    if tbparams[tb].has_key('federant') ],\
1662                    'vtopo': vtopo,\
1663                    'vis' : vis,
1664                    'experimentID' : [\
1665                            { 'fedid': expid }, { 'localname': eid },\
1666                        ],\
1667                }
1668        self.state[eid] = self.state[expid]
1669        if self.state_filename: self.write_state()
1670        self.state_lock.release()
1671
1672        if not failed:
1673            return resp
1674        else:
1675            raise service_error(service_error.partial, \
1676                    "Partial swap in on %s" % ",".join(succeeded))
1677
1678
1679    def get_vtopo(self, req, fid):
1680        """
1681        Return the stored virtual topology for this experiment
1682        """
1683        rv = None
1684
1685        req = req.get('VtopoRequestBody', None)
1686        if not req:
1687            raise service_error(service_error.req,
1688                    "Bad request format (no VtopoRequestBody)")
1689        exp = req.get('experiment', None)
1690        if exp:
1691            if exp.has_key('fedid'):
1692                key = exp['fedid']
1693                keytype = "fedid"
1694            elif exp.has_key('localname'):
1695                key = exp['localname']
1696                keytype = "localname"
1697            else:
1698                raise service_error(service_error.req, "Unknown lookup type")
1699        else:
1700            raise service_error(service_error.req, "No request?")
1701
1702        self.state_lock.acquire()
1703        if self.state.has_key(key):
1704            rv = { 'experiment' : {keytype: key },\
1705                    'vtopo': self.state[key]['vtopo'],\
1706                }
1707        self.state_lock.release()
1708
1709        if rv: return rv
1710        else: raise service_error(service_error.req, "No such experiment")
1711
1712    def get_vis(self, req, fid):
1713        """
1714        Return the stored visualization for this experiment
1715        """
1716        rv = None
1717
1718        req = req.get('VisRequestBody', None)
1719        if not req:
1720            raise service_error(service_error.req,
1721                    "Bad request format (no VisRequestBody)")
1722        exp = req.get('experiment', None)
1723        if exp:
1724            if exp.has_key('fedid'):
1725                key = exp['fedid']
1726                keytype = "fedid"
1727            elif exp.has_key('localname'):
1728                key = exp['localname']
1729                keytype = "localname"
1730            else:
1731                raise service_error(service_error.req, "Unknown lookup type")
1732        else:
1733            raise service_error(service_error.req, "No request?")
1734
1735        self.state_lock.acquire()
1736        if self.state.has_key(key):
1737            rv =  { 'experiment' : {keytype: key },\
1738                    'vis': self.state[key]['vis'],\
1739                    }
1740        self.state_lock.release()
1741
1742        if rv: return rv
1743        else: raise service_error(service_error.req, "No such experiment")
1744
1745    def get_info(self, req, fid):
1746        """
1747        Return all the stored info about this experiment
1748        """
1749        rv = None
1750
1751        req = req.get('InfoRequestBody', None)
1752        if not req:
1753            raise service_error(service_error.req,
1754                    "Bad request format (no VisRequestBody)")
1755        exp = req.get('experiment', None)
1756        if exp:
1757            if exp.has_key('fedid'):
1758                key = exp['fedid']
1759                keytype = "fedid"
1760            elif exp.has_key('localname'):
1761                key = exp['localname']
1762                keytype = "localname"
1763            else:
1764                raise service_error(service_error.req, "Unknown lookup type")
1765        else:
1766            raise service_error(service_error.req, "No request?")
1767
1768        # The state may be massaged by the service function that called
1769        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1770        # state.
1771        self.state_lock.acquire()
1772        if self.state.has_key(key):
1773            rv = copy.deepcopy(self.state[key])
1774        self.state_lock.release()
1775
1776        if rv: return rv
1777        else: raise service_error(service_error.req, "No such experiment")
1778
1779
1780    def terminate_experiment(self, req, fid):
1781        """
1782        Swap this experiment out on the federants and delete the shared
1783        information
1784        """
1785        tbparams = { }
1786        req = req.get('TerminateRequestBody', None)
1787        if not req:
1788            raise service_error(service_error.req,
1789                    "Bad request format (no TerminateRequestBody)")
1790        exp = req.get('experiment', None)
1791        if exp:
1792            if exp.has_key('fedid'):
1793                key = exp['fedid']
1794                keytype = "fedid"
1795            elif exp.has_key('localname'):
1796                key = exp['localname']
1797                keytype = "localname"
1798            else:
1799                raise service_error(service_error.req, "Unknown lookup type")
1800        else:
1801            raise service_error(service_error.req, "No request?")
1802
1803        self.state_lock.acquire()
1804        fed_exp = self.state.get(key, None)
1805
1806        if fed_exp:
1807            # This branch of the conditional holds the lock to generate a
1808            # consistent temporary tbparams variable to deallocate experiments.
1809            # It releases the lock to do the deallocations and reacquires it to
1810            # remove the experiment state when the termination is complete.
1811            ids = []
1812            #  experimentID is a list of dicts that are self-describing
1813            #  identifiers.  This finds all the fedids and localnames - the
1814            #  keys of self.state - and puts them into ids.
1815            for id in fed_exp.get('experimentID', []):
1816                if id.has_key('fedid'): ids.append(id['fedid'])
1817                if id.has_key('localname'): ids.append(id['localname'])
1818
1819            # Construct enough of the tbparams to make the stop_segment calls
1820            # work
1821            for fed in fed_exp['federant']:
1822                try:
1823                    for e in fed['name']:
1824                        eid = e.get('localname', None)
1825                        if eid: break
1826                    else:
1827                        continue
1828
1829                    p = fed['emulab']['project']
1830
1831                    project = p['name']['localname']
1832                    tb = p['testbed']['localname']
1833                    user = p['user'][0]['userID']['localname']
1834
1835                    domain = fed['emulab']['domain']
1836                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1837                    aid = fed['allocID']
1838                except KeyError, e:
1839                    continue
1840                tbparams[tb] = {\
1841                        'user': user,\
1842                        'domain': domain,\
1843                        'project': project,\
1844                        'host': host,\
1845                        'eid': eid,\
1846                        'aid': aid,\
1847                    }
1848            self.state_lock.release()
1849
1850            # Stop everyone.
1851            for tb in tbparams.keys():
1852                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1853
1854            # release the allocations
1855            for tb in tbparams.keys():
1856                self.release_access(tb, tbparams[tb]['aid'])
1857
1858            # Remove the terminated experiment
1859            self.state_lock.acquire()
1860            for id in ids:
1861                if self.state.has_key(id): del self.state[id]
1862
1863            if self.state_filename: self.write_state()
1864            self.state_lock.release()
1865
1866            return { 'experiment': exp }
1867        else:
1868            # Don't forget to release the lock
1869            self.state_lock.release()
1870            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.