source: fedd/fedd_experiment_control.py @ 721705e9

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 721705e9 was 721705e9, checked in by Ted Faber <faber@…>, 15 years ago

Split the service and creation access properly

  • Property mode set to 100644
File size: 54.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from fedd_services import *
22from fedd_internal_services import *
23from fedd_util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26import parse_detail
27from service_error import service_error
28
29
30class nullHandler(logging.Handler):
31    def emit(self, record): pass
32
33fl = logging.getLogger("fedd.experiment_control")
34fl.addHandler(nullHandler())
35
36class fedd_experiment_control_local:
37    """
38    Control of experiments that this system can directly access.
39
40    Includes experiment creation, termination and information dissemination.
41    Thred safe.
42    """
43   
44    class thread_pool:
45        """
46        A class to keep track of a set of threads all invoked for the same
47        task.  Manages the mutual exclusion of the states.
48        """
49        def __init__(self):
50            """
51            Start a pool.
52            """
53            self.changed = Condition()
54            self.started = 0
55            self.terminated = 0
56
57        def acquire(self):
58            """
59            Get the pool's lock.
60            """
61            self.changed.acquire()
62
63        def release(self):
64            """
65            Release the pool's lock.
66            """
67            self.changed.release()
68
69        def wait(self, timeout = None):
70            """
71            Wait for a pool thread to start or stop.
72            """
73            self.changed.wait(timeout)
74
75        def start(self):
76            """
77            Called by a pool thread to report starting.
78            """
79            self.changed.acquire()
80            self.started += 1
81            self.changed.notifyAll()
82            self.changed.release()
83
84        def terminate(self):
85            """
86            Called by a pool thread to report finishing.
87            """
88            self.changed.acquire()
89            self.terminated += 1
90            self.changed.notifyAll()
91            self.changed.release()
92
93        def clear(self):
94            """
95            Clear all pool data.
96            """
97            self.changed.acquire()
98            self.started = 0
99            self.terminated =0
100            self.changed.notifyAll()
101            self.changed.release()
102
103    class pooled_thread(Thread):
104        """
105        One of a set of threads dedicated to a specific task.  Uses the
106        thread_pool class above for coordination.
107        """
108        def __init__(self, group=None, target=None, name=None, args=(), 
109                kwargs={}, pdata=None, trace_file=None):
110            Thread.__init__(self, group, target, name, args, kwargs)
111            self.rv = None          # Return value of the ops in this thread
112            self.exception = None   # Exception that terminated this thread
113            self.target=target      # Target function to run on start()
114            self.args = args        # Args to pass to target
115            self.kwargs = kwargs    # Additional kw args
116            self.pdata = pdata      # thread_pool for this class
117            # Logger for this thread
118            self.log = logging.getLogger("fedd.experiment_control")
119       
120        def run(self):
121            """
122            Emulate Thread.run, except add pool data manipulation and error
123            logging.
124            """
125            if self.pdata:
126                self.pdata.start()
127
128            if self.target:
129                try:
130                    self.rv = self.target(*self.args, **self.kwargs)
131                except service_error, s:
132                    self.exception = s
133                    self.log.error("Thread exception: %s %s" % \
134                            (s.code_string(), s.desc))
135                except:
136                    self.exception = sys.exc_info()[1]
137                    self.log.error(("Unexpected thread exception: %s" +\
138                            "Trace %s") % (self.exception,\
139                                traceback.format_exc()))
140            if self.pdata:
141                self.pdata.terminate()
142
143    call_RequestAccess = service_caller('RequestAccess',
144            'getfeddPortType', feddServiceLocator, 
145            RequestAccessRequestMessage, 'RequestAccessRequestBody')
146
147    call_ReleaseAccess = service_caller('ReleaseAccess',
148            'getfeddPortType', feddServiceLocator, 
149            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
150
151    call_Ns2Split = service_caller('Ns2Split',
152            'getfeddInternalPortType', feddInternalServiceLocator, 
153            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
154
155    def __init__(self, config=None, auth=None):
156        """
157        Intialize the various attributes, most from the config object
158        """
159        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
160        self.thread_pool = fedd_experiment_control_local.thread_pool
161
162        self.cert_file = None
163        self.cert_pwd = None
164        self.trusted_certs = None
165
166        # Walk through the various relevant certificat specifying config
167        # attributes until the local certificate attributes can be resolved.
168        # The walk is from most specific to most general specification.
169        for s in ("experiment_control", "globals"):
170            if config.has_section(s): 
171                if config.has_option(s, "cert_file"):
172                    if not self.cert_file:
173                        self.cert_file = config.get(s, "cert_file")
174                        self.cert_pwd = config.get(s, "cert_pwd")
175
176                if config.has_option(s, "trusted_certs"):
177                    if not self.trusted_certs:
178                        self.trusted_certs = config.get(s, "trusted_certs")
179
180        self.exp_stem = "fed-stem"
181        self.log = logging.getLogger("fedd.experiment_control")
182        self.muxmax = 2
183        self.nthreads = 2
184        self.randomize_experiments = False
185
186        self.scp_exec = "/usr/bin/scp"
187        self.splitter = None
188        self.ssh_exec="/usr/bin/ssh"
189        self.ssh_keygen = "/usr/bin/ssh-keygen"
190        self.ssh_identity_file = None
191
192        if config.has_section("experiment_control"):
193            self.debug = config.get("experiment_control", "create_debug")
194            self.state_filename = config.get("experiment_control", 
195                    "experiment_state_file")
196            self.splitter_url = config.get("experiment_control", "splitter_url")
197            self.fedkit = config.get("experiment_control", "fedkit")
198        else:
199            self.debug = False
200            self.state_filename = None
201            self.splitter_url = None
202            self.fedkit = None
203
204        self.ssh_pubkey_file = config.get("experiment_control", 
205                "ssh_pubkey_file")
206        self.ssh_privkey_file = config.get("experiment_control",
207                "ssh_privkey_file")
208        # NB for internal master/slave ops, not experiment setup
209        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
210        self.state = { }
211        self.state_lock = Lock()
212        self.tclsh = "/usr/local/bin/otclsh"
213        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
214        self.tbmap = { 
215                'deter':'https://users.isi.deterlab.net:23235',
216                'emulab':'https://users.isi.deterlab.net:23236',
217                'ucb':'https://users.isi.deterlab.net:23237',
218                }
219        self.trace_file = sys.stderr
220
221        self.def_expstart = \
222                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
223                "/tmp/federate";
224        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
225                "FEDDIR/hosts";
226        self.def_gwstart = \
227                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
228                "/tmp/bridge.log";
229        self.def_mgwstart = \
230                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
231                "/tmp/bridge.log";
232        self.def_gwimage = "FBSD61-TUNNEL2";
233        self.def_gwtype = "pc";
234
235
236        if self.ssh_pubkey_file:
237            try:
238                print "reading %s" % self.ssh_pubkey_file
239                f = open(self.ssh_pubkey_file, 'r')
240                self.ssh_pubkey = f.read()
241                f.close()
242            except IOError:
243                raise service_error(service_error.internal,
244                        "Cannot read sshpubkey")
245        else:
246            raise service_error(service_error.internal, 
247                    "No SSH public key file?")
248
249        if not self.ssh_privkey_file:
250            raise service_error(service_error.internal, 
251                    "No SSH public key file?")
252
253        set_log_level(config, "experiment_control", self.log)
254
255        # Grab saved state.  OK to do this w/o locking because it's read only
256        # and only one thread should be in existence that can see self.state at
257        # this point.
258        if self.state_filename:
259            self.read_state()
260
261        # Dispatch tables
262        self.soap_services = {\
263                'Create': soap_handler(\
264                        CreateRequestMessage.typecode,
265                        getattr(self, "create_experiment"), 
266                        CreateResponseMessage,
267                        "CreateResponseBody"),
268                'Vtopo': soap_handler(\
269                        VtopoRequestMessage.typecode,
270                        getattr(self, "get_vtopo"),
271                        VtopoResponseMessage,
272                        "VtopoResponseBody"),
273                'Vis': soap_handler(\
274                        VisRequestMessage.typecode,
275                        getattr(self, "get_vis"),
276                        VisResponseMessage,
277                        "VisResponseBody"),
278                'Info': soap_handler(\
279                        InfoRequestMessage.typecode,
280                        getattr(self, "get_info"),
281                        InfoResponseMessage,
282                        "InfoResponseBody"),
283                'Terminate': soap_handler(\
284                        TerminateRequestMessage.typecode,
285                        getattr(self, "terminate_experiment"),
286                        TerminateResponseMessage,
287                        "TerminateResponseBody"),
288        }
289
290        self.xmlrpc_services = {\
291                'Create': xmlrpc_handler(\
292                        getattr(self, "create_experiment"), 
293                        "CreateResponseBody"),
294                'Vtopo': xmlrpc_handler(\
295                        getattr(self, "get_vtopo"),
296                        "VtopoResponseBody"),
297                'Vis': xmlrpc_handler(\
298                        getattr(self, "get_vis"),
299                        "VisResponseBody"),
300                'Info': xmlrpc_handler(\
301                        getattr(self, "get_info"),
302                        "InfoResponseBody"),
303                'Terminate': xmlrpc_handler(\
304                        getattr(self, "terminate_experiment"),
305                        "TerminateResponseBody"),
306        }
307
308    def copy_file(self, src, dest, size=1024):
309        """
310        Exceedingly simple file copy.
311        """
312        s = open(src,'r')
313        d = open(dest, 'w')
314
315        buf = "x"
316        while buf != "":
317            buf = s.read(size)
318            d.write(buf)
319        s.close()
320        d.close()
321
322    # Call while holding self.state_lock
323    def write_state(self):
324        """
325        Write a new copy of experiment state after copying the existing state
326        to a backup.
327
328        State format is a simple pickling of the state dictionary.
329        """
330        if os.access(self.state_filename, os.W_OK):
331            self.copy_file(self.state_filename, \
332                    "%s.bak" % self.state_filename)
333        try:
334            f = open(self.state_filename, 'w')
335            pickle.dump(self.state, f)
336        except IOError, e:
337            self.log.error("Can't write file %s: %s" % \
338                    (self.state_filename, e))
339        except pickle.PicklingError, e:
340            self.log.error("Pickling problem: %s" % e)
341        except TypeError, e:
342            self.log.error("Pickling problem (TypeError): %s" % e)
343
344    # Call while holding self.state_lock
345    def read_state(self):
346        """
347        Read a new copy of experiment state.  Old state is overwritten.
348
349        State format is a simple pickling of the state dictionary.
350        """
351        try:
352            f = open(self.state_filename, "r")
353            self.state = pickle.load(f)
354            self.log.debug("[read_state]: Read state from %s" % \
355                    self.state_filename)
356        except IOError, e:
357            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
358                    % (self.state_filename, e))
359        except pickle.UnpicklingError, e:
360            self.log.warning(("[read_state]: No saved state: " + \
361                    "Unpickling failed: %s") % e)
362
363    def scp_file(self, file, user, host, dest=""):
364        """
365        scp a file to the remote host.  If debug is set the action is only
366        logged.
367        """
368
369        scp_cmd = [self.scp_exec, '-i', self.ssh_privkey_file, file, 
370                "%s@%s:%s" % (user, host, dest)]
371        rv = 0
372
373        try:
374            dnull = open("/dev/null", "r")
375        except IOError:
376            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
377            dnull = Null
378
379        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
380        if not self.debug:
381            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
382            else: rv = call(scp_cmd)
383
384        return rv == 0
385
386    def ssh_cmd(self, user, host, cmd, wname=None):
387        """
388        Run a remote command on host as user.  If debug is set, the action is
389        only logged.
390        """
391        sh_str = "%s -i %s %s@%s %s" % (self.ssh_exec, self.ssh_privkey_file, 
392                user, host, cmd)
393
394        try:
395            dnull = open("/dev/null", "r")
396        except IOError:
397            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
398            dnull = Null
399
400        self.log.debug("[ssh_cmd]: %s" % sh_str)
401        if not self.debug:
402            if dnull:
403                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
404            else:
405                sub = Popen(sh_str, shell=True)
406            return sub.wait() == 0
407        else:
408            return True
409
410    def ship_configs(self, host, user, src_dir, dest_dir):
411        """
412        Copy federant-specific configuration files to the federant.
413        """
414        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
415            return False
416        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
417            return False
418
419        for f in os.listdir(src_dir):
420            if os.path.isdir(f):
421                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
422                        "%s/%s" % (dest_dir, f)):
423                    return False
424            else:
425                if not self.scp_file("%s/%s" % (src_dir, f), 
426                        user, host, dest_dir):
427                    return False
428        return True
429
430    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
431        """
432        Start a sub-experiment on a federant.
433
434        Get the current state, modify or create as appropriate, ship data and
435        configs and start the experiment.  There are small ordering differences
436        based on the initial state of the sub-experiment.
437        """
438        # ops node in the federant
439        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
440        user = tbparams[tb]['user']     # federant user
441        pid = tbparams[tb]['project']   # federant project
442        # XXX
443        base_confs = ( "hosts",)
444        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
445        # command to test experiment state
446        expinfo_exec = "/usr/testbed/bin/expinfo" 
447        # Configuration directories on the remote machine
448        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
449        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
450        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
451        # Regular expressions to parse the expinfo response
452        state_re = re.compile("State:\s+(\w+)")
453        no_exp_re = re.compile("^No\s+such\s+experiment")
454        state = None    # Experiment state parsed from expinfo
455        # The expinfo ssh command
456        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
457
458        # Get status
459        self.log.debug("[start_segment]: %s"% " ".join(cmd))
460        dev_null = None
461        try:
462            dev_null = open("/dev/null", "a")
463        except IOError, e:
464            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
465
466        if self.debug:
467            state = 'swapped'
468            rv = 0
469        else:
470            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
471            for line in status.stdout:
472                m = state_re.match(line)
473                if m: state = m.group(1)
474                else:
475                    m = no_exp_re.match(line)
476                    if m: state = "none"
477            rv = status.wait()
478
479        # If the experiment is not present the subcommand returns a non-zero
480        # return value.  If we successfully parsed a "none" outcome, ignore the
481        # return code.
482        if rv != 0 and state != "none":
483            raise service_error(service_error.internal,
484                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
485
486        self.log.debug("[start_segment]: %s: %s" % (tb, state))
487        self.log.info("[start_segment]:transferring experiment to %s" % tb)
488
489        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
490            return False
491        # Clear the federation files
492        if not self.ssh_cmd(user, host, 
493                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
494            return False
495        if not self.ssh_cmd(user, host, 
496                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
497            return False
498        # Clear and create the tarfiles and rpm directories
499        for d in (tarfiles_dir, rpms_dir):
500            if not self.ssh_cmd(user, host, 
501                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
502                return False
503            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
504                    "create tarfiles"):
505                return False
506       
507        if state == 'active':
508            # Remote experiment is active.  Modify it.
509            for f in base_confs:
510                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
511                        "%s/%s" % (proj_dir, f)):
512                    return False
513            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
514                    proj_dir):
515                return False
516            if os.path.isdir("%s/tarfiles" % tmpdir):
517                if not self.ship_configs(host, user,
518                        "%s/tarfiles" % tmpdir, tarfiles_dir):
519                    return False
520            if os.path.isdir("%s/rpms" % tmpdir):
521                if not self.ship_configs(host, user,
522                        "%s/rpms" % tmpdir, tarfiles_dir):
523                    return False
524            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
525            if not self.ssh_cmd(user, host,
526                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
527                            (pid, eid, tclfile), "modexp"):
528                return False
529            return True
530        elif state == "swapped":
531            # Remote experiment swapped out.  Modify it and swap it in.
532            for f in base_confs:
533                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
534                        "%s/%s" % (proj_dir, f)):
535                    return False
536            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
537                    proj_dir):
538                return False
539            if os.path.isdir("%s/tarfiles" % tmpdir):
540                if not self.ship_configs(host, user,
541                        "%s/tarfiles" % tmpdir, tarfiles_dir):
542                    return False
543            if os.path.isdir("%s/rpms" % tmpdir):
544                if not self.ship_configs(host, user,
545                        "%s/rpms" % tmpdir, tarfiles_dir):
546                    return False
547            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
548            if not self.ssh_cmd(user, host,
549                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
550                    "modexp"):
551                return False
552            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
553            if not self.ssh_cmd(user, host,
554                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
555                    "swapexp"):
556                return False
557            return True
558        elif state == "none":
559            # No remote experiment.  Create one.  We do this in 2 steps so we
560            # can put the configuration files and scripts into the new
561            # experiment directories.
562
563            # Tarfiles must be present for creation to work
564            if os.path.isdir("%s/tarfiles" % tmpdir):
565                if not self.ship_configs(host, user,
566                        "%s/tarfiles" % tmpdir, tarfiles_dir):
567                    return False
568            if os.path.isdir("%s/rpms" % tmpdir):
569                if not self.ship_configs(host, user,
570                        "%s/rpms" % tmpdir, tarfiles_dir):
571                    return False
572            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
573            if not self.ssh_cmd(user, host,
574                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
575                            (pid, eid, tclfile), "startexp"):
576                return False
577            # After startexp the per-experiment directories exist
578            for f in base_confs:
579                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
580                        "%s/%s" % (proj_dir, f)):
581                    return False
582            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
583                    proj_dir):
584                return False
585            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
586            if not self.ssh_cmd(user, host,
587                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
588                    "swapexp"):
589                return False
590            return True
591        else:
592            self.log.debug("[start_segment]:unknown state %s" % state)
593            return False
594
595    def stop_segment(self, tb, eid, tbparams):
596        """
597        Stop a sub experiment by calling swapexp on the federant
598        """
599        user = tbparams[tb]['user']
600        host = tbparams[tb]['host']
601        pid = tbparams[tb]['project']
602
603        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
604        return self.ssh_cmd(user, host,
605                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
606
607       
608    def generate_ssh_keys(self, dest, type="rsa" ):
609        """
610        Generate a set of keys for the gateways to use to talk.
611
612        Keys are of type type and are stored in the required dest file.
613        """
614        valid_types = ("rsa", "dsa")
615        t = type.lower();
616        if t not in valid_types: raise ValueError
617        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
618
619        try:
620            trace = open("/dev/null", "w")
621        except IOError:
622            raise service_error(service_error.internal,
623                    "Cannot open /dev/null??");
624
625        # May raise CalledProcessError
626        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
627        rv = call(cmd, stdout=trace, stderr=trace)
628        if rv != 0:
629            raise service_error(service_error.internal, 
630                    "Cannot generate nonce ssh keys.  %s return code %d" \
631                            % (self.ssh_keygen, rv))
632
633    def gentopo(self, str):
634        """
635        Generate the topology dtat structure from the splitter's XML
636        representation of it.
637
638        The topology XML looks like:
639            <experiment>
640                <nodes>
641                    <node><vname></vname><ips>ip1:ip2</ips></node>
642                </nodes>
643                <lans>
644                    <lan>
645                        <vname></vname><vnode></vnode><ip></ip>
646                        <bandwidth></bandwidth><member>node:port</member>
647                    </lan>
648                </lans>
649        """
650        class topo_parse:
651            """
652            Parse the topology XML and create the dats structure.
653            """
654            def __init__(self):
655                # Typing of the subelements for data conversion
656                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
657                self.int_subelements = ( 'bandwidth',)
658                self.float_subelements = ( 'delay',)
659                # The final data structure
660                self.nodes = [ ]
661                self.lans =  [ ]
662                self.topo = { \
663                        'node': self.nodes,\
664                        'lan' : self.lans,\
665                    }
666                self.element = { }  # Current element being created
667                self.chars = ""     # Last text seen
668
669            def end_element(self, name):
670                # After each sub element the contents is added to the current
671                # element or to the appropriate list.
672                if name == 'node':
673                    self.nodes.append(self.element)
674                    self.element = { }
675                elif name == 'lan':
676                    self.lans.append(self.element)
677                    self.element = { }
678                elif name in self.str_subelements:
679                    self.element[name] = self.chars
680                    self.chars = ""
681                elif name in self.int_subelements:
682                    self.element[name] = int(self.chars)
683                    self.chars = ""
684                elif name in self.float_subelements:
685                    self.element[name] = float(self.chars)
686                    self.chars = ""
687
688            def found_chars(self, data):
689                self.chars += data.rstrip()
690
691
692        tp = topo_parse();
693        parser = xml.parsers.expat.ParserCreate()
694        parser.EndElementHandler = tp.end_element
695        parser.CharacterDataHandler = tp.found_chars
696
697        parser.Parse(str)
698
699        return tp.topo
700       
701
702    def genviz(self, topo):
703        """
704        Generate the visualization the virtual topology
705        """
706
707        neato = "/usr/local/bin/neato"
708        # These are used to parse neato output and to create the visualization
709        # file.
710        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
711        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
712                "%s</type></node>"
713
714        try:
715            # Node names
716            nodes = [ n['vname'] for n in topo['node'] ]
717            topo_lans = topo['lan']
718        except KeyError:
719            raise service_error(service_error.internal, "Bad topology")
720
721        lans = { }
722        links = { }
723
724        # Walk through the virtual topology, organizing the connections into
725        # 2-node connections (links) and more-than-2-node connections (lans).
726        # When a lan is created, it's added to the list of nodes (there's a
727        # node in the visualization for the lan).
728        for l in topo_lans:
729            if links.has_key(l['vname']):
730                if len(links[l['vname']]) < 2:
731                    links[l['vname']].append(l['vnode'])
732                else:
733                    nodes.append(l['vname'])
734                    lans[l['vname']] = links[l['vname']]
735                    del links[l['vname']]
736                    lans[l['vname']].append(l['vnode'])
737            elif lans.has_key(l['vname']):
738                lans[l['vname']].append(l['vnode'])
739            else:
740                links[l['vname']] = [ l['vnode'] ]
741
742
743        # Open up a temporary file for dot to turn into a visualization
744        try:
745            df, dotname = tempfile.mkstemp()
746            dotfile = os.fdopen(df, 'w')
747        except IOError:
748            raise service_error(service_error.internal,
749                    "Failed to open file in genviz")
750
751        # Generate a dot/neato input file from the links, nodes and lans
752        try:
753            print >>dotfile, "graph G {"
754            for n in nodes:
755                print >>dotfile, '\t"%s"' % n
756            for l in links.keys():
757                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
758            for l in lans.keys():
759                for n in lans[l]:
760                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
761            print >>dotfile, "}"
762            dotfile.close()
763        except TypeError:
764            raise service_error(service_error.internal,
765                    "Single endpoint link in vtopo")
766        except IOError:
767            raise service_error(service_error.internal, "Cannot write dot file")
768
769        # Use dot to create a visualization
770        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
771                '-Gpack=true', dotname], stdout=PIPE)
772
773        # Translate dot to vis format
774        vis_nodes = [ ]
775        vis = { 'node': vis_nodes }
776        for line in dot.stdout:
777            m = vis_re.match(line)
778            if m:
779                vn = m.group(1)
780                vis_node = {'name': vn, \
781                        'x': float(m.group(2)),\
782                        'y' : float(m.group(3)),\
783                    }
784                if vn in links.keys() or vn in lans.keys():
785                    vis_node['type'] = 'lan'
786                else:
787                    vis_node['type'] = 'node'
788                vis_nodes.append(vis_node)
789        rv = dot.wait()
790
791        os.remove(dotname)
792        if rv == 0 : return vis
793        else: return None
794
795    def get_access(self, tb, nodes, user, tbparam, master, export_project):
796        """
797        Get access to testbed through fedd and set the parameters for that tb
798        """
799
800        translate_attr = {
801            'slavenodestartcmd': 'expstart',
802            'slaveconnectorstartcmd': 'gwstart',
803            'masternodestartcmd': 'mexpstart',
804            'masterconnectorstartcmd': 'mgwstart',
805            'connectorimage': 'gwimage',
806            'connectortype': 'gwtype',
807            'tunnelcfg': 'tun',
808            'smbshare': 'smbshare',
809        }
810
811        uri = self.tbmap.get(tb, None)
812        if not uri:
813            raise service_error(serice_error.server_config, 
814                    "Unknown testbed: %s" % tb)
815
816        # currently this lumps all users into one service access group
817        service_keys = [ a for u in user \
818                for a in u.get('access', []) \
819                    if a.has_key('sshPubkey')]
820
821        if len(service_keys) == 0:
822            raise service_error(service_error.req, 
823                    "Must have at least one SSH pubkey for services")
824
825
826        # The basic request
827        req = {\
828                'destinationTestbed' : { 'uri' : uri },
829                'user':  user,
830                'allocID' : { 'localname': 'test' },
831                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
832                'serviceAccess' : service_keys
833            }
834
835        print "service %s" % service_keys
836        print "ssh pubkey %s" % self.ssh_pubkey
837        print "ssh pubkey file %s" % self.ssh_pubkey_file
838
839        if tb == master:
840            # NB, the export_project parameter is a dict that includes
841            # the type
842            req['exportProject'] = export_project
843
844        # node resources if any
845        if nodes != None and len(nodes) > 0:
846            rnodes = [ ]
847            for n in nodes:
848                rn = { }
849                image, hw, count = n.split(":")
850                if image: rn['image'] = [ image ]
851                if hw: rn['hardware'] = [ hw ]
852                if count: rn['count'] = int(count)
853                rnodes.append(rn)
854            req['resources']= { }
855            req['resources']['node'] = rnodes
856
857        r = self.call_RequestAccess(uri, req, 
858                self.cert_file, self.cert_pwd, self.trusted_certs)
859
860        if r.has_key('RequestAccessResponseBody'):
861            r = r['RequestAccessResponseBody']
862        else:
863            raise service_error(service_error.protocol,
864                    "Bad proxy response")
865
866        e = r['emulab']
867        p = e['project']
868        tbparam[tb] = { 
869                "boss": e['boss'],
870                "host": e['ops'],
871                "domain": e['domain'],
872                "fs": e['fileServer'],
873                "eventserver": e['eventServer'],
874                "project": unpack_id(p['name']),
875                "emulab" : e,
876                "allocID" : r['allocID'],
877                }
878        # Make the testbed name be the label the user applied
879        p['testbed'] = {'localname': tb }
880
881        for u in p['user']:
882            tbparam[tb]['user'] = unpack_id(u['userID'])
883
884        for a in e['fedAttr']:
885            if a['attribute']:
886                key = translate_attr.get(a['attribute'].lower(), None)
887                if key:
888                    tbparam[tb][key]= a['value']
889       
890    def release_access(self, tb, aid):
891        """
892        Release access to testbed through fedd
893        """
894
895        uri = self.tbmap.get(tb, None)
896        if not uri:
897            raise service_error(serice_error.server_config, 
898                    "Unknown testbed: %s" % tb)
899
900        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
901                self.cert_file, self.cert_pwd, self.trusted_certs)
902
903        # better error coding
904
905    def remote_splitter(self, uri, desc, master):
906
907        req = {
908                'description' : { 'ns2description': desc },
909                'master': master,
910                'include_fedkit': bool(self.fedkit)
911            }
912
913        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
914                self.trusted_certs)
915
916        if r.has_key('Ns2SplitResponseBody'):
917            r = r['Ns2SplitResponseBody']
918            if r.has_key('output'):
919                return r['output'].splitlines()
920            else:
921                raise service_error(service_error.protocol, 
922                        "Bad splitter response (no output)")
923        else:
924            raise service_error(service_error.protocol, "Bad splitter response")
925       
926    class current_testbed:
927        """
928        Object for collecting the current testbed description.  The testbed
929        description is saved to a file with the local testbed variables
930        subsittuted line by line.
931        """
932        def __init__(self, eid, tmpdir, fedkit):
933            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
934            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
935            self.current_testbed = None
936            self.testbed_file = None
937
938            self.def_expstart = \
939                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
940            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
941            self.def_gwstart = \
942                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
943            self.def_mgwstart = \
944                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
945            self.def_gwimage = "FBSD61-TUNNEL2";
946            self.def_gwtype = "pc";
947
948            self.eid = eid
949            self.tmpdir = tmpdir
950            self.fedkit = fedkit
951
952        def __call__(self, line, master, allocated, tbparams):
953            # Capture testbed topology descriptions
954            if self.current_testbed == None:
955                m = self.begin_testbed.match(line)
956                if m != None:
957                    self.current_testbed = m.group(1)
958                    if self.current_testbed == None:
959                        raise service_error(service_error.req,
960                                "Bad request format (unnamed testbed)")
961                    allocated[self.current_testbed] = \
962                            allocated.get(self.current_testbed,0) + 1
963                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
964                    if not os.path.exists(tb_dir):
965                        try:
966                            os.mkdir(tb_dir)
967                        except IOError:
968                            raise service_error(service_error.internal,
969                                    "Cannot create %s" % tb_dir)
970                    try:
971                        self.testbed_file = open("%s/%s.%s.tcl" %
972                                (tb_dir, self.eid, self.current_testbed), 'w')
973                    except IOError:
974                        self.testbed_file = None
975                    return True
976                else: return False
977            else:
978                m = self.end_testbed.match(line)
979                if m != None:
980                    if m.group(1) != self.current_testbed:
981                        raise service_error(service_error.internal, 
982                                "Mismatched testbed markers!?")
983                    if self.testbed_file != None: 
984                        self.testbed_file.close()
985                        self.testbed_file = None
986                    self.current_testbed = None
987                elif self.testbed_file:
988                    # Substitute variables and put the line into the local
989                    # testbed file.
990                    gwtype = tbparams[self.current_testbed].get('gwtype', 
991                            self.def_gwtype)
992                    gwimage = tbparams[self.current_testbed].get('gwimage', 
993                            self.def_gwimage)
994                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
995                            self.def_mgwstart)
996                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
997                            self.def_mexpstart)
998                    gwstart = tbparams[self.current_testbed].get('gwstart', 
999                            self.def_gwstart)
1000                    expstart = tbparams[self.current_testbed].get('expstart', 
1001                            self.def_expstart)
1002                    project = tbparams[self.current_testbed].get('project')
1003                    line = re.sub("GWTYPE", gwtype, line)
1004                    line = re.sub("GWIMAGE", gwimage, line)
1005                    if self.current_testbed == master:
1006                        line = re.sub("GWSTART", mgwstart, line)
1007                        line = re.sub("EXPSTART", mexpstart, line)
1008                    else:
1009                        line = re.sub("GWSTART", gwstart, line)
1010                        line = re.sub("EXPSTART", expstart, line)
1011                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1012                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1013                    line = re.sub("EID", self.eid, line)
1014                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1015                            (project, self.eid), line)
1016                    if self.fedkit:
1017                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1018                                line)
1019                    print >>self.testbed_file, line
1020                return True
1021
1022    class allbeds:
1023        """
1024        Process the Allbeds section.  Get access to each federant and save the
1025        parameters in tbparams
1026        """
1027        def __init__(self, get_access):
1028            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1029            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1030            self.in_allbeds = False
1031            self.get_access = get_access
1032
1033        def __call__(self, line, user, tbparams, master, export_project):
1034            # Testbed access parameters
1035            if not self.in_allbeds:
1036                if self.begin_allbeds.match(line):
1037                    self.in_allbeds = True
1038                    return True
1039                else:
1040                    return False
1041            else:
1042                if self.end_allbeds.match(line):
1043                    self.in_allbeds = False
1044                else:
1045                    nodes = line.split('|')
1046                    tb = nodes.pop(0)
1047                    self.get_access(tb, nodes, user, tbparams, master,
1048                            export_project)
1049                return True
1050
1051    class gateways:
1052        def __init__(self, eid, master, tmpdir, gw_pubkey,
1053                gw_secretkey, copy_file, fedkit):
1054            self.begin_gateways = \
1055                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1056            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1057            self.current_gateways = None
1058            self.control_gateway = None
1059            self.active_end = { }
1060
1061            self.eid = eid
1062            self.master = master
1063            self.tmpdir = tmpdir
1064            self.gw_pubkey_base = gw_pubkey
1065            self.gw_secretkey_base = gw_secretkey
1066
1067            self.copy_file = copy_file
1068            self.fedkit = fedkit
1069
1070
1071        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1072                active_end, tbparams, dtb, myname, desthost, type):
1073            """
1074            Produce a gateway configuration file from a gateways line.
1075            """
1076
1077            sproject = tbparams[gw].get('project', 'project')
1078            dproject = tbparams[dtb].get('project', 'project')
1079            sdomain = ".%s.%s%s" % (eid, sproject,
1080                    tbparams[gw].get('domain', ".example.com"))
1081            ddomain = ".%s.%s%s" % (eid, dproject,
1082                    tbparams[dtb].get('domain', ".example.com"))
1083            boss = tbparams[master].get('boss', "boss")
1084            fs = tbparams[master].get('fs', "fs")
1085            event_server = "%s%s" % \
1086                    (tbparams[gw].get('eventserver', "event_server"),
1087                            tbparams[gw].get('domain', "example.com"))
1088            remote_event_server = "%s%s" % \
1089                    (tbparams[dtb].get('eventserver', "event_server"),
1090                            tbparams[dtb].get('domain', "example.com"))
1091            seer_control = "%s%s" % \
1092                    (tbparams[gw].get('control', "control"), sdomain)
1093
1094            if self.fedkit:
1095                remote_script_dir = "/usr/local/federation/bin"
1096                local_script_dir = "/usr/local/federation/bin"
1097            else:
1098                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1099                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1100
1101            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1102            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1103            tunnel_cfg = tbparams[gw].get("tun", "false")
1104
1105            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1106            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1107
1108            # translate to lower case so the `hostname` hack for specifying
1109            # configuration files works.
1110            conf_file = conf_file.lower();
1111            remote_conf_file = remote_conf_file.lower();
1112
1113            if dtb == master:
1114                active = "false"
1115            elif gw == master:
1116                active = "true"
1117            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1118                active = "false"
1119            else:
1120                active_end['%s-%s' % (gw, dtb)] = 1
1121                active = "true"
1122
1123            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1124            print >>gwconfig, "Active: %s" % active
1125            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1126            print >>gwconfig, "BossName: %s" % boss
1127            print >>gwconfig, "FsName: %s" % fs
1128            print >>gwconfig, "EventServerName: %s" % event_server
1129            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1130            print >>gwconfig, "SeerControl: %s" % seer_control
1131            print >>gwconfig, "Type: %s" % type
1132            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1133            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1134                    local_script_dir
1135            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1136            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1137            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1138                    (remote_conf_dir, remote_conf_file)
1139            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1140            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1141            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1142            gwconfig.close()
1143
1144            return active == "true"
1145
1146        def __call__(self, line, allocated, tbparams):
1147            # Process gateways
1148            if not self.current_gateways:
1149                m = self.begin_gateways.match(line)
1150                if m:
1151                    self.current_gateways = m.group(1)
1152                    if allocated.has_key(self.current_gateways):
1153                        # This test should always succeed
1154                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1155                        if not os.path.exists(tb_dir):
1156                            try:
1157                                os.mkdir(tb_dir)
1158                            except IOError:
1159                                raise service_error(service_error.internal,
1160                                        "Cannot create %s" % tb_dir)
1161                    else:
1162                        # XXX
1163                        self.log.error("[gateways]: Ignoring gateways for " + \
1164                                "unknown testbed %s" % self.current_gateways)
1165                        self.current_gateways = None
1166                    return True
1167                else:
1168                    return False
1169            else:
1170                m = self.end_gateways.match(line)
1171                if m :
1172                    if m.group(1) != self.current_gateways:
1173                        raise service_error(service_error.internal,
1174                                "Mismatched gateway markers!?")
1175                    if self.control_gateway:
1176                        try:
1177                            cc = open("%s/%s/client.conf" %
1178                                    (self.tmpdir, self.current_gateways), 'w')
1179                            print >>cc, "ControlGateway: %s" % \
1180                                    self.control_gateway
1181                            if tbparams[self.master].has_key('smbshare'):
1182                                print >>cc, "SMBSHare: %s" % \
1183                                        tbparams[self.master]['smbshare']
1184                            print >>cc, "ProjectUser: %s" % \
1185                                    tbparams[self.master]['user']
1186                            print >>cc, "ProjectName: %s" % \
1187                                    tbparams[self.master]['project']
1188                            cc.close()
1189                        except IOError:
1190                            raise service_error(service_error.internal,
1191                                    "Error creating client config")
1192                        try:
1193                            cc = open("%s/%s/seer.conf" %
1194                                    (self.tmpdir, self.current_gateways),
1195                                    'w')
1196                            if self.current_gateways != self.master:
1197                                print >>cc, "ControlNode: %s" % \
1198                                        self.control_gateway
1199                            print >>cc, "ExperimentID: %s/%s" % \
1200                                    ( tbparams[self.master]['project'], \
1201                                    self.eid )
1202                            cc.close()
1203                        except IOError:
1204                            raise service_error(service_error.internal,
1205                                    "Error creating seer config")
1206                    else:
1207                        debug.error("[gateways]: No control gateway for %s" %\
1208                                    self.current_gateways)
1209                    self.current_gateways = None
1210                else:
1211                    dtb, myname, desthost, type = line.split(" ")
1212
1213                    if type == "control" or type == "both":
1214                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1215                                self.eid, 
1216                                tbparams[self.current_gateways]['project'],
1217                                tbparams[self.current_gateways]['domain'])
1218                    try:
1219                        active = self.gateway_conf_file(self.current_gateways,
1220                                self.master, self.eid, self.gw_pubkey_base,
1221                                self.gw_secretkey_base,
1222                                self.active_end, tbparams, dtb, myname,
1223                                desthost, type)
1224                    except IOError, e:
1225                        raise service_error(service_error.internal,
1226                                "Failed to write config file for %s" % \
1227                                        self.current_gateway)
1228           
1229                    gw_pubkey = "%s/keys/%s" % \
1230                            (self.tmpdir, self.gw_pubkey_base)
1231                    gw_secretkey = "%s/keys/%s" % \
1232                            (self.tmpdir, self.gw_secretkey_base)
1233
1234                    pkfile = "%s/%s/%s" % \
1235                            ( self.tmpdir, self.current_gateways, 
1236                                    self.gw_pubkey_base)
1237                    skfile = "%s/%s/%s" % \
1238                            ( self.tmpdir, self.current_gateways, 
1239                                    self.gw_secretkey_base)
1240
1241                    if not os.path.exists(pkfile):
1242                        try:
1243                            self.copy_file(gw_pubkey, pkfile)
1244                        except IOError:
1245                            service_error(service_error.internal,
1246                                    "Failed to copy pubkey file")
1247
1248                    if active and not os.path.exists(skfile):
1249                        try:
1250                            self.copy_file(gw_secretkey, skfile)
1251                        except IOError:
1252                            service_error(service_error.internal,
1253                                    "Failed to copy secretkey file")
1254                return True
1255
1256    class shunt_to_file:
1257        """
1258        Simple class to write data between two regexps to a file.
1259        """
1260        def __init__(self, begin, end, filename):
1261            """
1262            Begin shunting on a match of begin, stop on end, send data to
1263            filename.
1264            """
1265            self.begin = re.compile(begin)
1266            self.end = re.compile(end)
1267            self.in_shunt = False
1268            self.file = None
1269            self.filename = filename
1270
1271        def __call__(self, line):
1272            """
1273            Call this on each line in the input that may be shunted.
1274            """
1275            if not self.in_shunt:
1276                if self.begin.match(line):
1277                    self.in_shunt = True
1278                    try:
1279                        self.file = open(self.filename, "w")
1280                    except:
1281                        self.file = None
1282                        raise
1283                    return True
1284                else:
1285                    return False
1286            else:
1287                if self.end.match(line):
1288                    if self.file: 
1289                        self.file.close()
1290                        self.file = None
1291                    self.in_shunt = False
1292                else:
1293                    if self.file:
1294                        print >>self.file, line
1295                return True
1296
1297    class shunt_to_list:
1298        """
1299        Same interface as shunt_to_file.  Data collected in self.list, one list
1300        element per line.
1301        """
1302        def __init__(self, begin, end):
1303            self.begin = re.compile(begin)
1304            self.end = re.compile(end)
1305            self.in_shunt = False
1306            self.list = [ ]
1307       
1308        def __call__(self, line):
1309            if not self.in_shunt:
1310                if self.begin.match(line):
1311                    self.in_shunt = True
1312                    return True
1313                else:
1314                    return False
1315            else:
1316                if self.end.match(line):
1317                    self.in_shunt = False
1318                else:
1319                    self.list.append(line)
1320                return True
1321
1322    class shunt_to_string:
1323        """
1324        Same interface as shunt_to_file.  Data collected in self.str, all in
1325        one string.
1326        """
1327        def __init__(self, begin, end):
1328            self.begin = re.compile(begin)
1329            self.end = re.compile(end)
1330            self.in_shunt = False
1331            self.str = ""
1332       
1333        def __call__(self, line):
1334            if not self.in_shunt:
1335                if self.begin.match(line):
1336                    self.in_shunt = True
1337                    return True
1338                else:
1339                    return False
1340            else:
1341                if self.end.match(line):
1342                    self.in_shunt = False
1343                else:
1344                    self.str += line
1345                return True
1346
1347    def create_experiment(self, req, fid):
1348        """
1349        The external interface to experiment creation called from the
1350        dispatcher.
1351
1352        Creates a working directory, splits the incoming description using the
1353        splitter script and parses out the avrious subsections using the
1354        lcasses above.  Once each sub-experiment is created, use pooled threads
1355        to instantiate them and start it all up.
1356        """
1357        try:
1358            tmpdir = tempfile.mkdtemp(prefix="split-")
1359        except IOError:
1360            raise service_error(service_error.internal, "Cannot create tmp dir")
1361
1362        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1363        gw_secretkey_base = "fed.%s" % self.ssh_type
1364        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1365        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1366        tclfile = tmpdir + "/experiment.tcl"
1367        tbparams = { }
1368
1369        pid = "dummy"
1370        gid = "dummy"
1371        # XXX
1372        fail_soft = False
1373
1374        try:
1375            os.mkdir(tmpdir+"/keys")
1376        except OSError:
1377            raise service_error(service_error.internal,
1378                    "Can't make temporary dir")
1379
1380        req = req.get('CreateRequestBody', None)
1381        if not req:
1382            raise service_error(service_error.req,
1383                    "Bad request format (no CreateRequestBody)")
1384        # The tcl parser needs to read a file so put the content into that file
1385        descr=req.get('experimentdescription', None)
1386        if descr:
1387            file_content=descr.get('ns2description', None)
1388            if file_content:
1389                try:
1390                    f = open(tclfile, 'w')
1391                    f.write(file_content)
1392                    f.close()
1393                except IOError:
1394                    raise service_error(service_error.internal,
1395                            "Cannot write temp experiment description")
1396            else:
1397                raise service_error(service_error.req, 
1398                        "Only ns2descriptions supported")
1399        else:
1400            raise service_error(service_error.req, "No experiment description")
1401
1402        if req.has_key('experimentID') and \
1403                req['experimentID'].has_key('localname'):
1404            eid = req['experimentID']['localname']
1405            self.state_lock.acquire()
1406            while (self.state.has_key(eid)):
1407                eid += random.choice(string.ascii_letters)
1408            # To avoid another thread picking this localname
1409            self.state[eid] = "placeholder"
1410            self.state_lock.release()
1411        else:
1412            eid = self.exp_stem
1413            for i in range(0,5):
1414                eid += random.choice(string.ascii_letters)
1415            self.state_lock.acquire()
1416            while (self.state.has_key(eid)):
1417                eid = self.exp_stem
1418                for i in range(0,5):
1419                    eid += random.choice(string.ascii_letters)
1420            # To avoid another thread picking this localname
1421            self.state[eid] = "placeholder"
1422            self.state_lock.release()
1423
1424        try:
1425            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1426        except ValueError:
1427            raise service_error(service_error.server_config, 
1428                    "Bad key type (%s)" % self.ssh_type)
1429
1430        user = req.get('user', None)
1431        if user == None:
1432            raise service_error(service_error.req, "No user")
1433
1434        master = req.get('master', None)
1435        if not master:
1436            raise service_error(service_error.req, "No master testbed label")
1437        export_project = req.get('exportProject', None)
1438        if not export_project:
1439            raise service_error(service_error.req, "No export project")
1440       
1441        if self.splitter_url:
1442            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1443            split_data = self.remote_splitter(self.splitter_url, file_content,
1444                    master)
1445        else:
1446            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1447                str(self.muxmax), '-m', master]
1448
1449            if self.fedkit:
1450                tclcmd.append('-k')
1451
1452            tclcmd.extend([pid, gid, eid, tclfile])
1453
1454            self.log.debug("running local splitter %s", " ".join(tclcmd))
1455            tclparser = Popen(tclcmd, stdout=PIPE)
1456            split_data = tclparser.stdout
1457
1458        allocated = { }     # Testbeds we can access
1459        started = { }       # Testbeds where a sub-experiment started
1460                            # successfully
1461
1462        # Objects to parse the splitter output (defined above)
1463        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1464        parse_allbeds = self.allbeds(self.get_access)
1465        parse_gateways = self.gateways(eid, master, tmpdir,
1466                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1467        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1468                    "^#\s+End\s+Vtopo")
1469        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1470                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1471        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1472                "^#\s+End\s+tarfiles")
1473        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1474                "^#\s+End\s+rpms")
1475
1476        # Worling on the split data
1477        for line in split_data:
1478            line = line.rstrip()
1479            if parse_current_testbed(line, master, allocated, tbparams):
1480                continue
1481            elif parse_allbeds(line, user, tbparams, master, export_project):
1482                continue
1483            elif parse_gateways(line, allocated, tbparams):
1484                continue
1485            elif parse_vtopo(line):
1486                continue
1487            elif parse_hostnames(line):
1488                continue
1489            elif parse_tarfiles(line):
1490                continue
1491            elif parse_rpms(line):
1492                continue
1493            else:
1494                raise service_error(service_error.internal, 
1495                        "Bad tcl parse? %s" % line)
1496
1497        # Virtual topology and visualization
1498        vtopo = self.gentopo(parse_vtopo.str)
1499        if not vtopo:
1500            raise service_error(service_error.internal, 
1501                    "Failed to generate virtual topology")
1502
1503        vis = self.genviz(vtopo)
1504        if not vis:
1505            raise service_error(service_error.internal, 
1506                    "Failed to generate visualization")
1507
1508        # save federant information
1509        for k in allocated.keys():
1510            tbparams[k]['federant'] = {\
1511                    'name': [ { 'localname' : eid} ],\
1512                    'emulab': tbparams[k]['emulab'],\
1513                    'allocID' : tbparams[k]['allocID'],\
1514                    'master' : k == master,\
1515                }
1516
1517
1518        # Copy tarfiles and rpms needed at remote sites into a staging area
1519        try:
1520            if self.fedkit:
1521                parse_tarfiles.list.append(self.fedkit)
1522            for t in parse_tarfiles.list:
1523                if not os.path.exists("%s/tarfiles" % tmpdir):
1524                    os.mkdir("%s/tarfiles" % tmpdir)
1525                self.copy_file(t, "%s/tarfiles/%s" % \
1526                        (tmpdir, os.path.basename(t)))
1527            for r in parse_rpms.list:
1528                if not os.path.exists("%s/rpms" % tmpdir):
1529                    os.mkdir("%s/rpms" % tmpdir)
1530                self.copy_file(r, "%s/rpms/%s" % \
1531                        (tmpdir, os.path.basename(r)))
1532        except IOError, e:
1533            raise service_error(service_error.internal, 
1534                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1535
1536        thread_pool_info = self.thread_pool()
1537        threads = [ ]
1538
1539        for tb in [ k for k in allocated.keys() if k != master]:
1540            # Wait until we have a free slot to start the next testbed load
1541            thread_pool_info.acquire()
1542            while thread_pool_info.started - \
1543                    thread_pool_info.terminated >= self.nthreads:
1544                thread_pool_info.wait()
1545            thread_pool_info.release()
1546
1547            # Create and start a thread to start the segment, and save it to
1548            # get the return value later
1549            t  = self.pooled_thread(target=self.start_segment, 
1550                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1551                    pdata=thread_pool_info, trace_file=self.trace_file)
1552            threads.append(t)
1553            t.start()
1554
1555        # Wait until all finish (the first clause of the while is to make sure
1556        # one starts)
1557        thread_pool_info.acquire()
1558        while thread_pool_info.started == 0 or \
1559                thread_pool_info.started > thread_pool_info.terminated:
1560            thread_pool_info.wait()
1561        thread_pool_info.release()
1562
1563        # If none failed, start the master
1564        failed = [ t.getName() for t in threads if not t.rv ]
1565
1566        if len(failed) == 0:
1567            if not self.start_segment(master, eid, tbparams, tmpdir):
1568                failed.append(master)
1569
1570        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1571        # If one failed clean up, unless fail_soft is set
1572        if failed:
1573            if not fail_soft:
1574                for tb in succeeded:
1575                    self.stop_segment(tb, eid, tbparams)
1576                # Remove the placeholder
1577                self.state_lock.acquire()
1578                del self.state[eid]
1579                self.state_lock.release()
1580
1581                raise service_error(service_error.federant,
1582                    "Swap in failed on %s" % ",".join(failed))
1583        else:
1584            self.log.info("[start_segment]: Experiment %s started" % eid)
1585
1586        # Generate an ID for the experiment (slice) and a certificate that the
1587        # allocator can use to prove they own it.  We'll ship it back through
1588        # the encrypted connection.
1589        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1590
1591        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1592
1593        # Walk up tmpdir, deleting as we go
1594        for path, dirs, files in os.walk(tmpdir, topdown=False):
1595            for f in files:
1596                os.remove(os.path.join(path, f))
1597            for d in dirs:
1598                os.rmdir(os.path.join(path, d))
1599        os.rmdir(tmpdir)
1600
1601        # The deepcopy prevents the allocation ID and other binaries from being
1602        # translated into other formats
1603        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1604                for tb in tbparams.keys() \
1605                    if tbparams[tb].has_key('federant') ],\
1606                    'vtopo': vtopo,\
1607                    'vis' : vis,
1608                    'experimentID' : [\
1609                            { 'fedid': copy.copy(expid) }, \
1610                            { 'localname': eid },\
1611                        ],\
1612                    'experimentAccess': { 'X509' : expcert },\
1613                }
1614
1615        # Insert the experiment into our state and update the disk copy
1616        self.state_lock.acquire()
1617        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1618                for tb in tbparams.keys() \
1619                    if tbparams[tb].has_key('federant') ],\
1620                    'vtopo': vtopo,\
1621                    'vis' : vis,
1622                    'experimentID' : [\
1623                            { 'fedid': expid }, { 'localname': eid },\
1624                        ],\
1625                }
1626        self.state[eid] = self.state[expid]
1627        if self.state_filename: self.write_state()
1628        self.state_lock.release()
1629
1630        if not failed:
1631            return resp
1632        else:
1633            raise service_error(service_error.partial, \
1634                    "Partial swap in on %s" % ",".join(succeeded))
1635
1636
1637    def get_vtopo(self, req, fid):
1638        """
1639        Return the stored virtual topology for this experiment
1640        """
1641        rv = None
1642
1643        req = req.get('VtopoRequestBody', None)
1644        if not req:
1645            raise service_error(service_error.req,
1646                    "Bad request format (no VtopoRequestBody)")
1647        exp = req.get('experiment', None)
1648        if exp:
1649            if exp.has_key('fedid'):
1650                key = exp['fedid']
1651                keytype = "fedid"
1652            elif exp.has_key('localname'):
1653                key = exp['localname']
1654                keytype = "localname"
1655            else:
1656                raise service_error(service_error.req, "Unknown lookup type")
1657        else:
1658            raise service_error(service_error.req, "No request?")
1659
1660        self.state_lock.acquire()
1661        if self.state.has_key(key):
1662            rv = { 'experiment' : {keytype: key },\
1663                    'vtopo': self.state[key]['vtopo'],\
1664                }
1665        self.state_lock.release()
1666
1667        if rv: return rv
1668        else: raise service_error(service_error.req, "No such experiment")
1669
1670    def get_vis(self, req, fid):
1671        """
1672        Return the stored visualization for this experiment
1673        """
1674        rv = None
1675
1676        req = req.get('VisRequestBody', None)
1677        if not req:
1678            raise service_error(service_error.req,
1679                    "Bad request format (no VisRequestBody)")
1680        exp = req.get('experiment', None)
1681        if exp:
1682            if exp.has_key('fedid'):
1683                key = exp['fedid']
1684                keytype = "fedid"
1685            elif exp.has_key('localname'):
1686                key = exp['localname']
1687                keytype = "localname"
1688            else:
1689                raise service_error(service_error.req, "Unknown lookup type")
1690        else:
1691            raise service_error(service_error.req, "No request?")
1692
1693        self.state_lock.acquire()
1694        if self.state.has_key(key):
1695            rv =  { 'experiment' : {keytype: key },\
1696                    'vis': self.state[key]['vis'],\
1697                    }
1698        self.state_lock.release()
1699
1700        if rv: return rv
1701        else: raise service_error(service_error.req, "No such experiment")
1702
1703    def get_info(self, req, fid):
1704        """
1705        Return all the stored info about this experiment
1706        """
1707        rv = None
1708
1709        req = req.get('InfoRequestBody', None)
1710        if not req:
1711            raise service_error(service_error.req,
1712                    "Bad request format (no VisRequestBody)")
1713        exp = req.get('experiment', None)
1714        if exp:
1715            if exp.has_key('fedid'):
1716                key = exp['fedid']
1717                keytype = "fedid"
1718            elif exp.has_key('localname'):
1719                key = exp['localname']
1720                keytype = "localname"
1721            else:
1722                raise service_error(service_error.req, "Unknown lookup type")
1723        else:
1724            raise service_error(service_error.req, "No request?")
1725
1726        # The state may be massaged by the service function that called
1727        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1728        # state.
1729        self.state_lock.acquire()
1730        if self.state.has_key(key):
1731            rv = copy.deepcopy(self.state[key])
1732        self.state_lock.release()
1733
1734        if rv: return rv
1735        else: raise service_error(service_error.req, "No such experiment")
1736
1737
1738    def terminate_experiment(self, req, fid):
1739        """
1740        Swap this experiment out on the federants and delete the shared
1741        information
1742        """
1743        tbparams = { }
1744        req = req.get('TerminateRequestBody', None)
1745        if not req:
1746            raise service_error(service_error.req,
1747                    "Bad request format (no TerminateRequestBody)")
1748        exp = req.get('experiment', None)
1749        if exp:
1750            if exp.has_key('fedid'):
1751                key = exp['fedid']
1752                keytype = "fedid"
1753            elif exp.has_key('localname'):
1754                key = exp['localname']
1755                keytype = "localname"
1756            else:
1757                raise service_error(service_error.req, "Unknown lookup type")
1758        else:
1759            raise service_error(service_error.req, "No request?")
1760
1761        self.state_lock.acquire()
1762        fed_exp = self.state.get(key, None)
1763
1764        if fed_exp:
1765            # This branch of the conditional holds the lock to generate a
1766            # consistent temporary tbparams variable to deallocate experiments.
1767            # It releases the lock to do the deallocations and reacquires it to
1768            # remove the experiment state when the termination is complete.
1769            ids = []
1770            #  experimentID is a list of dicts that are self-describing
1771            #  identifiers.  This finds all the fedids and localnames - the
1772            #  keys of self.state - and puts them into ids.
1773            for id in fed_exp.get('experimentID', []):
1774                if id.has_key('fedid'): ids.append(id['fedid'])
1775                if id.has_key('localname'): ids.append(id['localname'])
1776
1777            # Construct enough of the tbparams to make the stop_segment calls
1778            # work
1779            for fed in fed_exp['federant']:
1780                try:
1781                    for e in fed['name']:
1782                        eid = e.get('localname', None)
1783                        if eid: break
1784                    else:
1785                        continue
1786
1787                    p = fed['emulab']['project']
1788
1789                    project = p['name']['localname']
1790                    tb = p['testbed']['localname']
1791                    user = p['user'][0]['userID']['localname']
1792
1793                    domain = fed['emulab']['domain']
1794                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1795                    aid = fed['allocID']
1796                except KeyError, e:
1797                    continue
1798                tbparams[tb] = {\
1799                        'user': user,\
1800                        'domain': domain,\
1801                        'project': project,\
1802                        'host': host,\
1803                        'eid': eid,\
1804                        'aid': aid,\
1805                    }
1806            self.state_lock.release()
1807
1808            # Stop everyone.
1809            for tb in tbparams.keys():
1810                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1811
1812            # release the allocations
1813            for tb in tbparams.keys():
1814                self.release_access(tb, tbparams[tb]['aid'])
1815
1816            # Remove the terminated experiment
1817            self.state_lock.acquire()
1818            for id in ids:
1819                if self.state.has_key(id): del self.state[id]
1820
1821            if self.state_filename: self.write_state()
1822            self.state_lock.release()
1823
1824            return { 'experiment': exp }
1825        else:
1826            # Don't forget to release the lock
1827            self.state_lock.release()
1828            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.