source: fedd/fedd/experiment_control.py @ 6a0c9f4

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 6a0c9f4 was 6a0c9f4, checked in by Ted Faber <faber@…>, 15 years ago

More namespace cleanup

  • Property mode set to 100644
File size: 60.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from fedd_services import *
22from fedd_internal_services import *
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42   
43    class thread_pool:
44        """
45        A class to keep track of a set of threads all invoked for the same
46        task.  Manages the mutual exclusion of the states.
47        """
48        def __init__(self):
49            """
50            Start a pool.
51            """
52            self.changed = Condition()
53            self.started = 0
54            self.terminated = 0
55
56        def acquire(self):
57            """
58            Get the pool's lock.
59            """
60            self.changed.acquire()
61
62        def release(self):
63            """
64            Release the pool's lock.
65            """
66            self.changed.release()
67
68        def wait(self, timeout = None):
69            """
70            Wait for a pool thread to start or stop.
71            """
72            self.changed.wait(timeout)
73
74        def start(self):
75            """
76            Called by a pool thread to report starting.
77            """
78            self.changed.acquire()
79            self.started += 1
80            self.changed.notifyAll()
81            self.changed.release()
82
83        def terminate(self):
84            """
85            Called by a pool thread to report finishing.
86            """
87            self.changed.acquire()
88            self.terminated += 1
89            self.changed.notifyAll()
90            self.changed.release()
91
92        def clear(self):
93            """
94            Clear all pool data.
95            """
96            self.changed.acquire()
97            self.started = 0
98            self.terminated =0
99            self.changed.notifyAll()
100            self.changed.release()
101
102    class pooled_thread(Thread):
103        """
104        One of a set of threads dedicated to a specific task.  Uses the
105        thread_pool class above for coordination.
106        """
107        def __init__(self, group=None, target=None, name=None, args=(), 
108                kwargs={}, pdata=None, trace_file=None):
109            Thread.__init__(self, group, target, name, args, kwargs)
110            self.rv = None          # Return value of the ops in this thread
111            self.exception = None   # Exception that terminated this thread
112            self.target=target      # Target function to run on start()
113            self.args = args        # Args to pass to target
114            self.kwargs = kwargs    # Additional kw args
115            self.pdata = pdata      # thread_pool for this class
116            # Logger for this thread
117            self.log = logging.getLogger("fedd.experiment_control")
118       
119        def run(self):
120            """
121            Emulate Thread.run, except add pool data manipulation and error
122            logging.
123            """
124            if self.pdata:
125                self.pdata.start()
126
127            if self.target:
128                try:
129                    self.rv = self.target(*self.args, **self.kwargs)
130                except service_error, s:
131                    self.exception = s
132                    self.log.error("Thread exception: %s %s" % \
133                            (s.code_string(), s.desc))
134                except:
135                    self.exception = sys.exc_info()[1]
136                    self.log.error(("Unexpected thread exception: %s" +\
137                            "Trace %s") % (self.exception,\
138                                traceback.format_exc()))
139            if self.pdata:
140                self.pdata.terminate()
141
142    call_RequestAccess = service_caller('RequestAccess',
143            'getfeddPortType', feddServiceLocator, 
144            RequestAccessRequestMessage, 'RequestAccessRequestBody')
145
146    call_ReleaseAccess = service_caller('ReleaseAccess',
147            'getfeddPortType', feddServiceLocator, 
148            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
149
150    call_Ns2Split = service_caller('Ns2Split',
151            'getfeddInternalPortType', feddInternalServiceLocator, 
152            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
153
154    def __init__(self, config=None, auth=None):
155        """
156        Intialize the various attributes, most from the config object
157        """
158        self.thread_with_rv = experiment_control_local.pooled_thread
159        self.thread_pool = experiment_control_local.thread_pool
160
161        self.cert_file = None
162        self.cert_pwd = None
163        self.trusted_certs = None
164
165        # Walk through the various relevant certificat specifying config
166        # attributes until the local certificate attributes can be resolved.
167        # The walk is from most specific to most general specification.
168        for s in ("experiment_control", "globals"):
169            if config.has_section(s): 
170                if config.has_option(s, "cert_file"):
171                    if not self.cert_file:
172                        self.cert_file = config.get(s, "cert_file")
173                        self.cert_pwd = config.get(s, "cert_pwd")
174
175                if config.has_option(s, "trusted_certs"):
176                    if not self.trusted_certs:
177                        self.trusted_certs = config.get(s, "trusted_certs")
178
179
180        self.exp_stem = "fed-stem"
181        self.log = logging.getLogger("fedd.experiment_control")
182        set_log_level(config, "experiment_control", self.log)
183        self.muxmax = 2
184        self.nthreads = 2
185        self.randomize_experiments = False
186
187        self.scp_exec = "/usr/bin/scp"
188        self.splitter = None
189        self.ssh_exec="/usr/bin/ssh"
190        self.ssh_keygen = "/usr/bin/ssh-keygen"
191        self.ssh_identity_file = None
192
193
194        self.debug = config.getboolean("experiment_control", "create_debug")
195        self.state_filename = config.get("experiment_control", 
196                "experiment_state_file")
197        self.splitter_url = config.get("experiment_control", "splitter_url")
198        self.fedkit = config.get("experiment_control", "fedkit")
199        accessdb_file = config.get("experiment_control", "accessdb")
200
201        self.ssh_pubkey_file = config.get("experiment_control", 
202                "ssh_pubkey_file")
203        self.ssh_privkey_file = config.get("experiment_control",
204                "ssh_privkey_file")
205        # NB for internal master/slave ops, not experiment setup
206        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
207        self.state = { }
208        self.state_lock = Lock()
209        self.tclsh = "/usr/local/bin/otclsh"
210        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
211        mapdb_file = config.get("experiment_control", "mapdb")
212        self.trace_file = sys.stderr
213
214        self.def_expstart = \
215                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
216                "/tmp/federate";
217        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
218                "FEDDIR/hosts";
219        self.def_gwstart = \
220                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
221                "/tmp/bridge.log";
222        self.def_mgwstart = \
223                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
224                "/tmp/bridge.log";
225        self.def_gwimage = "FBSD61-TUNNEL2";
226        self.def_gwtype = "pc";
227
228        if auth:
229            self.auth = auth
230        else:
231            self.log.error(\
232                    "[access]: No authorizer initialized, creating local one.")
233            auth = authorizer()
234
235
236        if self.ssh_pubkey_file:
237            try:
238                f = open(self.ssh_pubkey_file, 'r')
239                self.ssh_pubkey = f.read()
240                f.close()
241            except IOError:
242                raise service_error(service_error.internal,
243                        "Cannot read sshpubkey")
244        else:
245            raise service_error(service_error.internal, 
246                    "No SSH public key file?")
247
248        if not self.ssh_privkey_file:
249            raise service_error(service_error.internal, 
250                    "No SSH public key file?")
251
252
253        if mapdb_file:
254            self.read_mapdb(mapdb_file)
255        else:
256            self.log.warn("[experiment_control] No testbed map, using defaults")
257            self.tbmap = { 
258                    'deter':'https://users.isi.deterlab.net:23235',
259                    'emulab':'https://users.isi.deterlab.net:23236',
260                    'ucb':'https://users.isi.deterlab.net:23237',
261                    }
262
263        if accessdb_file:
264                self.read_accessdb(accessdb_file)
265        else:
266            raise service_error(service_error.internal,
267                    "No accessdb specified in config")
268
269        # Grab saved state.  OK to do this w/o locking because it's read only
270        # and only one thread should be in existence that can see self.state at
271        # this point.
272        if self.state_filename:
273            self.read_state()
274
275        # Dispatch tables
276        self.soap_services = {\
277                'Create': soap_handler(\
278                        CreateRequestMessage.typecode,
279                        getattr(self, "create_experiment"), 
280                        CreateResponseMessage,
281                        "CreateResponseBody"),
282                'Vtopo': soap_handler(\
283                        VtopoRequestMessage.typecode,
284                        getattr(self, "get_vtopo"),
285                        VtopoResponseMessage,
286                        "VtopoResponseBody"),
287                'Vis': soap_handler(\
288                        VisRequestMessage.typecode,
289                        getattr(self, "get_vis"),
290                        VisResponseMessage,
291                        "VisResponseBody"),
292                'Info': soap_handler(\
293                        InfoRequestMessage.typecode,
294                        getattr(self, "get_info"),
295                        InfoResponseMessage,
296                        "InfoResponseBody"),
297                'Terminate': soap_handler(\
298                        TerminateRequestMessage.typecode,
299                        getattr(self, "terminate_experiment"),
300                        TerminateResponseMessage,
301                        "TerminateResponseBody"),
302        }
303
304        self.xmlrpc_services = {\
305                'Create': xmlrpc_handler(\
306                        getattr(self, "create_experiment"), 
307                        "CreateResponseBody"),
308                'Vtopo': xmlrpc_handler(\
309                        getattr(self, "get_vtopo"),
310                        "VtopoResponseBody"),
311                'Vis': xmlrpc_handler(\
312                        getattr(self, "get_vis"),
313                        "VisResponseBody"),
314                'Info': xmlrpc_handler(\
315                        getattr(self, "get_info"),
316                        "InfoResponseBody"),
317                'Terminate': xmlrpc_handler(\
318                        getattr(self, "terminate_experiment"),
319                        "TerminateResponseBody"),
320        }
321
322    def copy_file(self, src, dest, size=1024):
323        """
324        Exceedingly simple file copy.
325        """
326        s = open(src,'r')
327        d = open(dest, 'w')
328
329        buf = "x"
330        while buf != "":
331            buf = s.read(size)
332            d.write(buf)
333        s.close()
334        d.close()
335
336    # Call while holding self.state_lock
337    def write_state(self):
338        """
339        Write a new copy of experiment state after copying the existing state
340        to a backup.
341
342        State format is a simple pickling of the state dictionary.
343        """
344        if os.access(self.state_filename, os.W_OK):
345            self.copy_file(self.state_filename, \
346                    "%s.bak" % self.state_filename)
347        try:
348            f = open(self.state_filename, 'w')
349            pickle.dump(self.state, f)
350        except IOError, e:
351            self.log.error("Can't write file %s: %s" % \
352                    (self.state_filename, e))
353        except pickle.PicklingError, e:
354            self.log.error("Pickling problem: %s" % e)
355        except TypeError, e:
356            self.log.error("Pickling problem (TypeError): %s" % e)
357
358    # Call while holding self.state_lock
359    def read_state(self):
360        """
361        Read a new copy of experiment state.  Old state is overwritten.
362
363        State format is a simple pickling of the state dictionary.
364        """
365        try:
366            f = open(self.state_filename, "r")
367            self.state = pickle.load(f)
368            self.log.debug("[read_state]: Read state from %s" % \
369                    self.state_filename)
370        except IOError, e:
371            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
372                    % (self.state_filename, e))
373        except pickle.UnpicklingError, e:
374            self.log.warning(("[read_state]: No saved state: " + \
375                    "Unpickling failed: %s") % e)
376       
377        for k in self.state.keys():
378            try:
379                # This list should only have one element in it, but phrasing it
380                # as a for loop doesn't cost much, really.  We have to find the
381                # fedid elements anyway.
382                for eid in [ f['fedid'] \
383                        for f in self.state[k]['experimentID']\
384                            if f.has_key('fedid') ]:
385                    self.auth.set_attribute(self.state[k]['owner'], eid)
386            except KeyError, e:
387                self.log.warning("[read_state]: State ownership or identity " +\
388                        "misformatted in %s: %s" % (self.state_filename, e))
389
390
391    def read_accessdb(self, accessdb_file):
392        """
393        Read the mapping from fedids that can create experiments to their name
394        in the 3-level access namespace.  All will be asserted from this
395        testbed and can include the local username and porject that will be
396        asserted on their behalf by this fedd.  Each fedid is also added to the
397        authorization system with the "create" attribute.
398        """
399        self.accessdb = {}
400        # These are the regexps for parsing the db
401        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
402        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
403                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
404        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
405                "\s*->\s*(" + name_expr + ")\s*$")
406        lineno = 0
407
408        # Parse the mappings and store in self.authdb, a dict of
409        # fedid -> (proj, user)
410        try:
411            f = open(accessdb_file, "r")
412            for line in f:
413                lineno += 1
414                line = line.strip()
415                if len(line) == 0 or line.startswith('#'):
416                    continue
417                m = project_line.match(line)
418                if m:
419                    fid = fedid(hexstr=m.group(1))
420                    project, user = m.group(2,3)
421                    if not self.accessdb.has_key(fid):
422                        self.accessdb[fid] = []
423                    self.accessdb[fid].append((project, user))
424                    continue
425
426                m = user_line.match(line)
427                if m:
428                    fid = fedid(hexstr=m.group(1))
429                    project = None
430                    user = m.group(2)
431                    if not self.accessdb.has_key(fid):
432                        self.accessdb[fid] = []
433                    self.accessdb[fid].append((project, user))
434                    continue
435                self.log.warn("[experiment_control] Error parsing access " +\
436                        "db %s at line %d" %  (accessdb_file, lineno))
437        except IOError:
438            raise service_error(service_error.internal,
439                    "Error opening/reading %s as experiment " +\
440                            "control accessdb" %  accessdb_file)
441        f.close()
442
443        # Initialize the authorization attributes
444        for fid in self.accessdb.keys():
445            self.auth.set_attribute(fid, 'create')
446
447    def read_mapdb(self, file):
448        """
449        Read a simple colon separated list of mappings for the
450        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
451        """
452
453        self.tbmap = { }
454        lineno =0
455        try:
456            f = open(file, "r")
457            for line in f:
458                lineno += 1
459                line = line.strip()
460                if line.startswith('#') or len(line) == 0:
461                    continue
462                try:
463                    label, url = line.split(':', 1)
464                    self.tbmap[label] = url
465                except ValueError, e:
466                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
467                            "map db: %s %s" % (lineno, line, e))
468        except IOError, e:
469            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
470                    "open %s: %s" % (file, e))
471        f.close()
472
473    def scp_file(self, file, user, host, dest=""):
474        """
475        scp a file to the remote host.  If debug is set the action is only
476        logged.
477        """
478
479        scp_cmd = [self.scp_exec, '-i', self.ssh_privkey_file, file, 
480                "%s@%s:%s" % (user, host, dest)]
481        rv = 0
482
483        try:
484            dnull = open("/dev/null", "r")
485        except IOError:
486            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
487            dnull = Null
488
489        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
490        if not self.debug:
491            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
492            else: rv = call(scp_cmd)
493
494        return rv == 0
495
496    def ssh_cmd(self, user, host, cmd, wname=None):
497        """
498        Run a remote command on host as user.  If debug is set, the action is
499        only logged.
500        """
501        sh_str = "%s -i %s %s@%s %s" % (self.ssh_exec, self.ssh_privkey_file, 
502                user, host, cmd)
503
504        try:
505            dnull = open("/dev/null", "r")
506        except IOError:
507            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
508            dnull = Null
509
510        self.log.debug("[ssh_cmd]: %s" % sh_str)
511        if not self.debug:
512            if dnull:
513                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
514            else:
515                sub = Popen(sh_str, shell=True)
516            return sub.wait() == 0
517        else:
518            return True
519
520    def ship_configs(self, host, user, src_dir, dest_dir):
521        """
522        Copy federant-specific configuration files to the federant.
523        """
524        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
525            return False
526        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
527            return False
528
529        for f in os.listdir(src_dir):
530            if os.path.isdir(f):
531                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
532                        "%s/%s" % (dest_dir, f)):
533                    return False
534            else:
535                if not self.scp_file("%s/%s" % (src_dir, f), 
536                        user, host, dest_dir):
537                    return False
538        return True
539
540    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
541        """
542        Start a sub-experiment on a federant.
543
544        Get the current state, modify or create as appropriate, ship data and
545        configs and start the experiment.  There are small ordering differences
546        based on the initial state of the sub-experiment.
547        """
548        # ops node in the federant
549        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
550        user = tbparams[tb]['user']     # federant user
551        pid = tbparams[tb]['project']   # federant project
552        # XXX
553        base_confs = ( "hosts",)
554        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
555        # command to test experiment state
556        expinfo_exec = "/usr/testbed/bin/expinfo" 
557        # Configuration directories on the remote machine
558        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
559        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
560        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
561        # Regular expressions to parse the expinfo response
562        state_re = re.compile("State:\s+(\w+)")
563        no_exp_re = re.compile("^No\s+such\s+experiment")
564        state = None    # Experiment state parsed from expinfo
565        # The expinfo ssh command
566        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
567
568        # Get status
569        self.log.debug("[start_segment]: %s"% " ".join(cmd))
570        dev_null = None
571        try:
572            dev_null = open("/dev/null", "a")
573        except IOError, e:
574            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
575
576        if self.debug:
577            state = 'swapped'
578            rv = 0
579        else:
580            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
581            for line in status.stdout:
582                m = state_re.match(line)
583                if m: state = m.group(1)
584                else:
585                    m = no_exp_re.match(line)
586                    if m: state = "none"
587            rv = status.wait()
588
589        # If the experiment is not present the subcommand returns a non-zero
590        # return value.  If we successfully parsed a "none" outcome, ignore the
591        # return code.
592        if rv != 0 and state != "none":
593            raise service_error(service_error.internal,
594                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
595
596        self.log.debug("[start_segment]: %s: %s" % (tb, state))
597        self.log.info("[start_segment]:transferring experiment to %s" % tb)
598
599        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
600            return False
601        # Clear the federation files
602        if not self.ssh_cmd(user, host, 
603                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
604            return False
605        if not self.ssh_cmd(user, host, 
606                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
607            return False
608        # Clear and create the tarfiles and rpm directories
609        for d in (tarfiles_dir, rpms_dir):
610            if not self.ssh_cmd(user, host, 
611                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
612                return False
613            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
614                    "create tarfiles"):
615                return False
616       
617        if state == 'active':
618            # Remote experiment is active.  Modify it.
619            for f in base_confs:
620                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
621                        "%s/%s" % (proj_dir, f)):
622                    return False
623            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
624                    proj_dir):
625                return False
626            if os.path.isdir("%s/tarfiles" % tmpdir):
627                if not self.ship_configs(host, user,
628                        "%s/tarfiles" % tmpdir, tarfiles_dir):
629                    return False
630            if os.path.isdir("%s/rpms" % tmpdir):
631                if not self.ship_configs(host, user,
632                        "%s/rpms" % tmpdir, tarfiles_dir):
633                    return False
634            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
635            if not self.ssh_cmd(user, host,
636                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
637                            (pid, eid, tclfile), "modexp"):
638                return False
639            return True
640        elif state == "swapped":
641            # Remote experiment swapped out.  Modify it and swap it in.
642            for f in base_confs:
643                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
644                        "%s/%s" % (proj_dir, f)):
645                    return False
646            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
647                    proj_dir):
648                return False
649            if os.path.isdir("%s/tarfiles" % tmpdir):
650                if not self.ship_configs(host, user,
651                        "%s/tarfiles" % tmpdir, tarfiles_dir):
652                    return False
653            if os.path.isdir("%s/rpms" % tmpdir):
654                if not self.ship_configs(host, user,
655                        "%s/rpms" % tmpdir, tarfiles_dir):
656                    return False
657            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
658            if not self.ssh_cmd(user, host,
659                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
660                    "modexp"):
661                return False
662            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
663            if not self.ssh_cmd(user, host,
664                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
665                    "swapexp"):
666                return False
667            return True
668        elif state == "none":
669            # No remote experiment.  Create one.  We do this in 2 steps so we
670            # can put the configuration files and scripts into the new
671            # experiment directories.
672
673            # Tarfiles must be present for creation to work
674            if os.path.isdir("%s/tarfiles" % tmpdir):
675                if not self.ship_configs(host, user,
676                        "%s/tarfiles" % tmpdir, tarfiles_dir):
677                    return False
678            if os.path.isdir("%s/rpms" % tmpdir):
679                if not self.ship_configs(host, user,
680                        "%s/rpms" % tmpdir, tarfiles_dir):
681                    return False
682            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
683            if not self.ssh_cmd(user, host,
684                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
685                            (pid, eid, tclfile), "startexp"):
686                return False
687            # After startexp the per-experiment directories exist
688            for f in base_confs:
689                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
690                        "%s/%s" % (proj_dir, f)):
691                    return False
692            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
693                    proj_dir):
694                return False
695            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
696            if not self.ssh_cmd(user, host,
697                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
698                    "swapexp"):
699                return False
700            return True
701        else:
702            self.log.debug("[start_segment]:unknown state %s" % state)
703            return False
704
705    def stop_segment(self, tb, eid, tbparams):
706        """
707        Stop a sub experiment by calling swapexp on the federant
708        """
709        user = tbparams[tb]['user']
710        host = tbparams[tb]['host']
711        pid = tbparams[tb]['project']
712
713        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
714        return self.ssh_cmd(user, host,
715                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
716
717       
718    def generate_ssh_keys(self, dest, type="rsa" ):
719        """
720        Generate a set of keys for the gateways to use to talk.
721
722        Keys are of type type and are stored in the required dest file.
723        """
724        valid_types = ("rsa", "dsa")
725        t = type.lower();
726        if t not in valid_types: raise ValueError
727        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
728
729        try:
730            trace = open("/dev/null", "w")
731        except IOError:
732            raise service_error(service_error.internal,
733                    "Cannot open /dev/null??");
734
735        # May raise CalledProcessError
736        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
737        rv = call(cmd, stdout=trace, stderr=trace)
738        if rv != 0:
739            raise service_error(service_error.internal, 
740                    "Cannot generate nonce ssh keys.  %s return code %d" \
741                            % (self.ssh_keygen, rv))
742
743    def gentopo(self, str):
744        """
745        Generate the topology dtat structure from the splitter's XML
746        representation of it.
747
748        The topology XML looks like:
749            <experiment>
750                <nodes>
751                    <node><vname></vname><ips>ip1:ip2</ips></node>
752                </nodes>
753                <lans>
754                    <lan>
755                        <vname></vname><vnode></vnode><ip></ip>
756                        <bandwidth></bandwidth><member>node:port</member>
757                    </lan>
758                </lans>
759        """
760        class topo_parse:
761            """
762            Parse the topology XML and create the dats structure.
763            """
764            def __init__(self):
765                # Typing of the subelements for data conversion
766                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
767                self.int_subelements = ( 'bandwidth',)
768                self.float_subelements = ( 'delay',)
769                # The final data structure
770                self.nodes = [ ]
771                self.lans =  [ ]
772                self.topo = { \
773                        'node': self.nodes,\
774                        'lan' : self.lans,\
775                    }
776                self.element = { }  # Current element being created
777                self.chars = ""     # Last text seen
778
779            def end_element(self, name):
780                # After each sub element the contents is added to the current
781                # element or to the appropriate list.
782                if name == 'node':
783                    self.nodes.append(self.element)
784                    self.element = { }
785                elif name == 'lan':
786                    self.lans.append(self.element)
787                    self.element = { }
788                elif name in self.str_subelements:
789                    self.element[name] = self.chars
790                    self.chars = ""
791                elif name in self.int_subelements:
792                    self.element[name] = int(self.chars)
793                    self.chars = ""
794                elif name in self.float_subelements:
795                    self.element[name] = float(self.chars)
796                    self.chars = ""
797
798            def found_chars(self, data):
799                self.chars += data.rstrip()
800
801
802        tp = topo_parse();
803        parser = xml.parsers.expat.ParserCreate()
804        parser.EndElementHandler = tp.end_element
805        parser.CharacterDataHandler = tp.found_chars
806
807        parser.Parse(str)
808
809        return tp.topo
810       
811
812    def genviz(self, topo):
813        """
814        Generate the visualization the virtual topology
815        """
816
817        neato = "/usr/local/bin/neato"
818        # These are used to parse neato output and to create the visualization
819        # file.
820        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
821        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
822                "%s</type></node>"
823
824        try:
825            # Node names
826            nodes = [ n['vname'] for n in topo['node'] ]
827            topo_lans = topo['lan']
828        except KeyError:
829            raise service_error(service_error.internal, "Bad topology")
830
831        lans = { }
832        links = { }
833
834        # Walk through the virtual topology, organizing the connections into
835        # 2-node connections (links) and more-than-2-node connections (lans).
836        # When a lan is created, it's added to the list of nodes (there's a
837        # node in the visualization for the lan).
838        for l in topo_lans:
839            if links.has_key(l['vname']):
840                if len(links[l['vname']]) < 2:
841                    links[l['vname']].append(l['vnode'])
842                else:
843                    nodes.append(l['vname'])
844                    lans[l['vname']] = links[l['vname']]
845                    del links[l['vname']]
846                    lans[l['vname']].append(l['vnode'])
847            elif lans.has_key(l['vname']):
848                lans[l['vname']].append(l['vnode'])
849            else:
850                links[l['vname']] = [ l['vnode'] ]
851
852
853        # Open up a temporary file for dot to turn into a visualization
854        try:
855            df, dotname = tempfile.mkstemp()
856            dotfile = os.fdopen(df, 'w')
857        except IOError:
858            raise service_error(service_error.internal,
859                    "Failed to open file in genviz")
860
861        # Generate a dot/neato input file from the links, nodes and lans
862        try:
863            print >>dotfile, "graph G {"
864            for n in nodes:
865                print >>dotfile, '\t"%s"' % n
866            for l in links.keys():
867                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
868            for l in lans.keys():
869                for n in lans[l]:
870                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
871            print >>dotfile, "}"
872            dotfile.close()
873        except TypeError:
874            raise service_error(service_error.internal,
875                    "Single endpoint link in vtopo")
876        except IOError:
877            raise service_error(service_error.internal, "Cannot write dot file")
878
879        # Use dot to create a visualization
880        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
881                '-Gpack=true', dotname], stdout=PIPE)
882
883        # Translate dot to vis format
884        vis_nodes = [ ]
885        vis = { 'node': vis_nodes }
886        for line in dot.stdout:
887            m = vis_re.match(line)
888            if m:
889                vn = m.group(1)
890                vis_node = {'name': vn, \
891                        'x': float(m.group(2)),\
892                        'y' : float(m.group(3)),\
893                    }
894                if vn in links.keys() or vn in lans.keys():
895                    vis_node['type'] = 'lan'
896                else:
897                    vis_node['type'] = 'node'
898                vis_nodes.append(vis_node)
899        rv = dot.wait()
900
901        os.remove(dotname)
902        if rv == 0 : return vis
903        else: return None
904
905    def get_access(self, tb, nodes, user, tbparam, master, export_project,
906            access_user):
907        """
908        Get access to testbed through fedd and set the parameters for that tb
909        """
910
911        translate_attr = {
912            'slavenodestartcmd': 'expstart',
913            'slaveconnectorstartcmd': 'gwstart',
914            'masternodestartcmd': 'mexpstart',
915            'masterconnectorstartcmd': 'mgwstart',
916            'connectorimage': 'gwimage',
917            'connectortype': 'gwtype',
918            'tunnelcfg': 'tun',
919            'smbshare': 'smbshare',
920        }
921
922        uri = self.tbmap.get(tb, None)
923        if not uri:
924            raise service_error(serice_error.server_config, 
925                    "Unknown testbed: %s" % tb)
926
927        # currently this lumps all users into one service access group
928        service_keys = [ a for u in user \
929                for a in u.get('access', []) \
930                    if a.has_key('sshPubkey')]
931
932        if len(service_keys) == 0:
933            raise service_error(service_error.req, 
934                    "Must have at least one SSH pubkey for services")
935
936
937        for p, u in access_user:
938            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
939                    "to %s") %  ((p or "None"), u, uri))
940
941            if p:
942                # Request with user and project specified
943                req = {\
944                        'destinationTestbed' : { 'uri' : uri },
945                        'project': { 
946                            'name': {'localname': p},
947                            'user': [ {'userID': { 'localname': u } } ],
948                            },
949                        'user':  user,
950                        'allocID' : { 'localname': 'test' },
951                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
952                        'serviceAccess' : service_keys
953                    }
954            else:
955                # Request with only user specified
956                req = {\
957                        'destinationTestbed' : { 'uri' : uri },
958                        'user':  [ {'userID': { 'localname': u } } ],
959                        'allocID' : { 'localname': 'test' },
960                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
961                        'serviceAccess' : service_keys
962                    }
963
964            if tb == master:
965                # NB, the export_project parameter is a dict that includes
966                # the type
967                req['exportProject'] = export_project
968
969            # node resources if any
970            if nodes != None and len(nodes) > 0:
971                rnodes = [ ]
972                for n in nodes:
973                    rn = { }
974                    image, hw, count = n.split(":")
975                    if image: rn['image'] = [ image ]
976                    if hw: rn['hardware'] = [ hw ]
977                    if count: rn['count'] = int(count)
978                    rnodes.append(rn)
979                req['resources']= { }
980                req['resources']['node'] = rnodes
981
982            try:
983                r = self.call_RequestAccess(uri, req, 
984                        self.cert_file, self.cert_pwd, self.trusted_certs)
985            except service_error, e:
986                if e.code == service_error.access:
987                    self.log.debug("[get_access] Access denied")
988                    r = None
989                    continue
990                else:
991                    raise e
992
993            if r.has_key('RequestAccessResponseBody'):
994                # Through to here we have a valid response, not a fault.
995                # Access denied is a fault, so something better or worse than
996                # access denied has happened.
997                r = r['RequestAccessResponseBody']
998                self.log.debug("[get_access] Access granted")
999                break
1000            else:
1001                raise service_error(service_error.protocol,
1002                        "Bad proxy response")
1003       
1004        if not r:
1005            raise service_error(service_error.access, 
1006                    "Access denied by %s (%s)" % (tb, uri))
1007
1008        e = r['emulab']
1009        p = e['project']
1010        tbparam[tb] = { 
1011                "boss": e['boss'],
1012                "host": e['ops'],
1013                "domain": e['domain'],
1014                "fs": e['fileServer'],
1015                "eventserver": e['eventServer'],
1016                "project": unpack_id(p['name']),
1017                "emulab" : e,
1018                "allocID" : r['allocID'],
1019                }
1020        # Make the testbed name be the label the user applied
1021        p['testbed'] = {'localname': tb }
1022
1023        for u in p['user']:
1024            tbparam[tb]['user'] = unpack_id(u['userID'])
1025
1026        for a in e['fedAttr']:
1027            if a['attribute']:
1028                key = translate_attr.get(a['attribute'].lower(), None)
1029                if key:
1030                    tbparam[tb][key]= a['value']
1031       
1032    def release_access(self, tb, aid):
1033        """
1034        Release access to testbed through fedd
1035        """
1036
1037        uri = self.tbmap.get(tb, None)
1038        if not uri:
1039            raise service_error(serice_error.server_config, 
1040                    "Unknown testbed: %s" % tb)
1041
1042        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1043                self.cert_file, self.cert_pwd, self.trusted_certs)
1044
1045        # better error coding
1046
1047    def remote_splitter(self, uri, desc, master):
1048
1049        req = {
1050                'description' : { 'ns2description': desc },
1051                'master': master,
1052                'include_fedkit': bool(self.fedkit)
1053            }
1054
1055        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1056                self.trusted_certs)
1057
1058        if r.has_key('Ns2SplitResponseBody'):
1059            r = r['Ns2SplitResponseBody']
1060            if r.has_key('output'):
1061                return r['output'].splitlines()
1062            else:
1063                raise service_error(service_error.protocol, 
1064                        "Bad splitter response (no output)")
1065        else:
1066            raise service_error(service_error.protocol, "Bad splitter response")
1067       
1068    class current_testbed:
1069        """
1070        Object for collecting the current testbed description.  The testbed
1071        description is saved to a file with the local testbed variables
1072        subsittuted line by line.
1073        """
1074        def __init__(self, eid, tmpdir, fedkit):
1075            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1076            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1077            self.current_testbed = None
1078            self.testbed_file = None
1079
1080            self.def_expstart = \
1081                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1082            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1083            self.def_gwstart = \
1084                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1085            self.def_mgwstart = \
1086                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1087            self.def_gwimage = "FBSD61-TUNNEL2";
1088            self.def_gwtype = "pc";
1089
1090            self.eid = eid
1091            self.tmpdir = tmpdir
1092            self.fedkit = fedkit
1093
1094        def __call__(self, line, master, allocated, tbparams):
1095            # Capture testbed topology descriptions
1096            if self.current_testbed == None:
1097                m = self.begin_testbed.match(line)
1098                if m != None:
1099                    self.current_testbed = m.group(1)
1100                    if self.current_testbed == None:
1101                        raise service_error(service_error.req,
1102                                "Bad request format (unnamed testbed)")
1103                    allocated[self.current_testbed] = \
1104                            allocated.get(self.current_testbed,0) + 1
1105                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1106                    if not os.path.exists(tb_dir):
1107                        try:
1108                            os.mkdir(tb_dir)
1109                        except IOError:
1110                            raise service_error(service_error.internal,
1111                                    "Cannot create %s" % tb_dir)
1112                    try:
1113                        self.testbed_file = open("%s/%s.%s.tcl" %
1114                                (tb_dir, self.eid, self.current_testbed), 'w')
1115                    except IOError:
1116                        self.testbed_file = None
1117                    return True
1118                else: return False
1119            else:
1120                m = self.end_testbed.match(line)
1121                if m != None:
1122                    if m.group(1) != self.current_testbed:
1123                        raise service_error(service_error.internal, 
1124                                "Mismatched testbed markers!?")
1125                    if self.testbed_file != None: 
1126                        self.testbed_file.close()
1127                        self.testbed_file = None
1128                    self.current_testbed = None
1129                elif self.testbed_file:
1130                    # Substitute variables and put the line into the local
1131                    # testbed file.
1132                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1133                            self.def_gwtype)
1134                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1135                            self.def_gwimage)
1136                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1137                            self.def_mgwstart)
1138                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1139                            self.def_mexpstart)
1140                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1141                            self.def_gwstart)
1142                    expstart = tbparams[self.current_testbed].get('expstart', 
1143                            self.def_expstart)
1144                    project = tbparams[self.current_testbed].get('project')
1145                    line = re.sub("GWTYPE", gwtype, line)
1146                    line = re.sub("GWIMAGE", gwimage, line)
1147                    if self.current_testbed == master:
1148                        line = re.sub("GWSTART", mgwstart, line)
1149                        line = re.sub("EXPSTART", mexpstart, line)
1150                    else:
1151                        line = re.sub("GWSTART", gwstart, line)
1152                        line = re.sub("EXPSTART", expstart, line)
1153                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1154                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1155                    line = re.sub("EID", self.eid, line)
1156                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1157                            (project, self.eid), line)
1158                    if self.fedkit:
1159                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1160                                line)
1161                    print >>self.testbed_file, line
1162                return True
1163
1164    class allbeds:
1165        """
1166        Process the Allbeds section.  Get access to each federant and save the
1167        parameters in tbparams
1168        """
1169        def __init__(self, get_access):
1170            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1171            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1172            self.in_allbeds = False
1173            self.get_access = get_access
1174
1175        def __call__(self, line, user, tbparams, master, export_project,
1176                access_user):
1177            # Testbed access parameters
1178            if not self.in_allbeds:
1179                if self.begin_allbeds.match(line):
1180                    self.in_allbeds = True
1181                    return True
1182                else:
1183                    return False
1184            else:
1185                if self.end_allbeds.match(line):
1186                    self.in_allbeds = False
1187                else:
1188                    nodes = line.split('|')
1189                    tb = nodes.pop(0)
1190                    self.get_access(tb, nodes, user, tbparams, master,
1191                            export_project, access_user)
1192                return True
1193
1194    class gateways:
1195        def __init__(self, eid, master, tmpdir, gw_pubkey,
1196                gw_secretkey, copy_file, fedkit):
1197            self.begin_gateways = \
1198                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1199            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1200            self.current_gateways = None
1201            self.control_gateway = None
1202            self.active_end = { }
1203
1204            self.eid = eid
1205            self.master = master
1206            self.tmpdir = tmpdir
1207            self.gw_pubkey_base = gw_pubkey
1208            self.gw_secretkey_base = gw_secretkey
1209
1210            self.copy_file = copy_file
1211            self.fedkit = fedkit
1212
1213
1214        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1215                active_end, tbparams, dtb, myname, desthost, type):
1216            """
1217            Produce a gateway configuration file from a gateways line.
1218            """
1219
1220            sproject = tbparams[gw].get('project', 'project')
1221            dproject = tbparams[dtb].get('project', 'project')
1222            sdomain = ".%s.%s%s" % (eid, sproject,
1223                    tbparams[gw].get('domain', ".example.com"))
1224            ddomain = ".%s.%s%s" % (eid, dproject,
1225                    tbparams[dtb].get('domain', ".example.com"))
1226            boss = tbparams[master].get('boss', "boss")
1227            fs = tbparams[master].get('fs', "fs")
1228            event_server = "%s%s" % \
1229                    (tbparams[gw].get('eventserver', "event_server"),
1230                            tbparams[gw].get('domain', "example.com"))
1231            remote_event_server = "%s%s" % \
1232                    (tbparams[dtb].get('eventserver', "event_server"),
1233                            tbparams[dtb].get('domain', "example.com"))
1234            seer_control = "%s%s" % \
1235                    (tbparams[gw].get('control', "control"), sdomain)
1236
1237            if self.fedkit:
1238                remote_script_dir = "/usr/local/federation/bin"
1239                local_script_dir = "/usr/local/federation/bin"
1240            else:
1241                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1242                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1243
1244            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1245            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1246            tunnel_cfg = tbparams[gw].get("tun", "false")
1247
1248            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1249            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1250
1251            # translate to lower case so the `hostname` hack for specifying
1252            # configuration files works.
1253            conf_file = conf_file.lower();
1254            remote_conf_file = remote_conf_file.lower();
1255
1256            if dtb == master:
1257                active = "false"
1258            elif gw == master:
1259                active = "true"
1260            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1261                active = "false"
1262            else:
1263                active_end['%s-%s' % (gw, dtb)] = 1
1264                active = "true"
1265
1266            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1267            print >>gwconfig, "Active: %s" % active
1268            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1269            print >>gwconfig, "BossName: %s" % boss
1270            print >>gwconfig, "FsName: %s" % fs
1271            print >>gwconfig, "EventServerName: %s" % event_server
1272            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1273            print >>gwconfig, "SeerControl: %s" % seer_control
1274            print >>gwconfig, "Type: %s" % type
1275            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1276            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1277                    local_script_dir
1278            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1279            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1280            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1281                    (remote_conf_dir, remote_conf_file)
1282            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1283            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1284            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1285            gwconfig.close()
1286
1287            return active == "true"
1288
1289        def __call__(self, line, allocated, tbparams):
1290            # Process gateways
1291            if not self.current_gateways:
1292                m = self.begin_gateways.match(line)
1293                if m:
1294                    self.current_gateways = m.group(1)
1295                    if allocated.has_key(self.current_gateways):
1296                        # This test should always succeed
1297                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1298                        if not os.path.exists(tb_dir):
1299                            try:
1300                                os.mkdir(tb_dir)
1301                            except IOError:
1302                                raise service_error(service_error.internal,
1303                                        "Cannot create %s" % tb_dir)
1304                    else:
1305                        # XXX
1306                        self.log.error("[gateways]: Ignoring gateways for " + \
1307                                "unknown testbed %s" % self.current_gateways)
1308                        self.current_gateways = None
1309                    return True
1310                else:
1311                    return False
1312            else:
1313                m = self.end_gateways.match(line)
1314                if m :
1315                    if m.group(1) != self.current_gateways:
1316                        raise service_error(service_error.internal,
1317                                "Mismatched gateway markers!?")
1318                    if self.control_gateway:
1319                        try:
1320                            cc = open("%s/%s/client.conf" %
1321                                    (self.tmpdir, self.current_gateways), 'w')
1322                            print >>cc, "ControlGateway: %s" % \
1323                                    self.control_gateway
1324                            if tbparams[self.master].has_key('smbshare'):
1325                                print >>cc, "SMBSHare: %s" % \
1326                                        tbparams[self.master]['smbshare']
1327                            print >>cc, "ProjectUser: %s" % \
1328                                    tbparams[self.master]['user']
1329                            print >>cc, "ProjectName: %s" % \
1330                                    tbparams[self.master]['project']
1331                            cc.close()
1332                        except IOError:
1333                            raise service_error(service_error.internal,
1334                                    "Error creating client config")
1335                        try:
1336                            cc = open("%s/%s/seer.conf" %
1337                                    (self.tmpdir, self.current_gateways),
1338                                    'w')
1339                            if self.current_gateways != self.master:
1340                                print >>cc, "ControlNode: %s" % \
1341                                        self.control_gateway
1342                            print >>cc, "ExperimentID: %s/%s" % \
1343                                    ( tbparams[self.master]['project'], \
1344                                    self.eid )
1345                            cc.close()
1346                        except IOError:
1347                            raise service_error(service_error.internal,
1348                                    "Error creating seer config")
1349                    else:
1350                        debug.error("[gateways]: No control gateway for %s" %\
1351                                    self.current_gateways)
1352                    self.current_gateways = None
1353                else:
1354                    dtb, myname, desthost, type = line.split(" ")
1355
1356                    if type == "control" or type == "both":
1357                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1358                                self.eid, 
1359                                tbparams[self.current_gateways]['project'],
1360                                tbparams[self.current_gateways]['domain'])
1361                    try:
1362                        active = self.gateway_conf_file(self.current_gateways,
1363                                self.master, self.eid, self.gw_pubkey_base,
1364                                self.gw_secretkey_base,
1365                                self.active_end, tbparams, dtb, myname,
1366                                desthost, type)
1367                    except IOError, e:
1368                        raise service_error(service_error.internal,
1369                                "Failed to write config file for %s" % \
1370                                        self.current_gateway)
1371           
1372                    gw_pubkey = "%s/keys/%s" % \
1373                            (self.tmpdir, self.gw_pubkey_base)
1374                    gw_secretkey = "%s/keys/%s" % \
1375                            (self.tmpdir, self.gw_secretkey_base)
1376
1377                    pkfile = "%s/%s/%s" % \
1378                            ( self.tmpdir, self.current_gateways, 
1379                                    self.gw_pubkey_base)
1380                    skfile = "%s/%s/%s" % \
1381                            ( self.tmpdir, self.current_gateways, 
1382                                    self.gw_secretkey_base)
1383
1384                    if not os.path.exists(pkfile):
1385                        try:
1386                            self.copy_file(gw_pubkey, pkfile)
1387                        except IOError:
1388                            service_error(service_error.internal,
1389                                    "Failed to copy pubkey file")
1390
1391                    if active and not os.path.exists(skfile):
1392                        try:
1393                            self.copy_file(gw_secretkey, skfile)
1394                        except IOError:
1395                            service_error(service_error.internal,
1396                                    "Failed to copy secretkey file")
1397                return True
1398
1399    class shunt_to_file:
1400        """
1401        Simple class to write data between two regexps to a file.
1402        """
1403        def __init__(self, begin, end, filename):
1404            """
1405            Begin shunting on a match of begin, stop on end, send data to
1406            filename.
1407            """
1408            self.begin = re.compile(begin)
1409            self.end = re.compile(end)
1410            self.in_shunt = False
1411            self.file = None
1412            self.filename = filename
1413
1414        def __call__(self, line):
1415            """
1416            Call this on each line in the input that may be shunted.
1417            """
1418            if not self.in_shunt:
1419                if self.begin.match(line):
1420                    self.in_shunt = True
1421                    try:
1422                        self.file = open(self.filename, "w")
1423                    except:
1424                        self.file = None
1425                        raise
1426                    return True
1427                else:
1428                    return False
1429            else:
1430                if self.end.match(line):
1431                    if self.file: 
1432                        self.file.close()
1433                        self.file = None
1434                    self.in_shunt = False
1435                else:
1436                    if self.file:
1437                        print >>self.file, line
1438                return True
1439
1440    class shunt_to_list:
1441        """
1442        Same interface as shunt_to_file.  Data collected in self.list, one list
1443        element per line.
1444        """
1445        def __init__(self, begin, end):
1446            self.begin = re.compile(begin)
1447            self.end = re.compile(end)
1448            self.in_shunt = False
1449            self.list = [ ]
1450       
1451        def __call__(self, line):
1452            if not self.in_shunt:
1453                if self.begin.match(line):
1454                    self.in_shunt = True
1455                    return True
1456                else:
1457                    return False
1458            else:
1459                if self.end.match(line):
1460                    self.in_shunt = False
1461                else:
1462                    self.list.append(line)
1463                return True
1464
1465    class shunt_to_string:
1466        """
1467        Same interface as shunt_to_file.  Data collected in self.str, all in
1468        one string.
1469        """
1470        def __init__(self, begin, end):
1471            self.begin = re.compile(begin)
1472            self.end = re.compile(end)
1473            self.in_shunt = False
1474            self.str = ""
1475       
1476        def __call__(self, line):
1477            if not self.in_shunt:
1478                if self.begin.match(line):
1479                    self.in_shunt = True
1480                    return True
1481                else:
1482                    return False
1483            else:
1484                if self.end.match(line):
1485                    self.in_shunt = False
1486                else:
1487                    self.str += line
1488                return True
1489
1490    def create_experiment(self, req, fid):
1491        """
1492        The external interface to experiment creation called from the
1493        dispatcher.
1494
1495        Creates a working directory, splits the incoming description using the
1496        splitter script and parses out the avrious subsections using the
1497        lcasses above.  Once each sub-experiment is created, use pooled threads
1498        to instantiate them and start it all up.
1499        """
1500
1501        if not self.auth.check_attribute(fid, 'create'):
1502            raise service_error(service_error.access, "Create access denied")
1503
1504        try:
1505            tmpdir = tempfile.mkdtemp(prefix="split-")
1506        except IOError:
1507            raise service_error(service_error.internal, "Cannot create tmp dir")
1508
1509        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1510        gw_secretkey_base = "fed.%s" % self.ssh_type
1511        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1512        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1513        tclfile = tmpdir + "/experiment.tcl"
1514        tbparams = { }
1515        try:
1516            access_user = self.accessdb[fid]
1517        except KeyError:
1518            raise service_error(service_error.internal,
1519                    "Access map and authorizer out of sync in " + \
1520                            "create_experiment for fedid %s"  % fid)
1521
1522        pid = "dummy"
1523        gid = "dummy"
1524        # XXX
1525        fail_soft = False
1526
1527        try:
1528            os.mkdir(tmpdir+"/keys")
1529        except OSError:
1530            raise service_error(service_error.internal,
1531                    "Can't make temporary dir")
1532
1533        req = req.get('CreateRequestBody', None)
1534        if not req:
1535            raise service_error(service_error.req,
1536                    "Bad request format (no CreateRequestBody)")
1537        # The tcl parser needs to read a file so put the content into that file
1538        descr=req.get('experimentdescription', None)
1539        if descr:
1540            file_content=descr.get('ns2description', None)
1541            if file_content:
1542                try:
1543                    f = open(tclfile, 'w')
1544                    f.write(file_content)
1545                    f.close()
1546                except IOError:
1547                    raise service_error(service_error.internal,
1548                            "Cannot write temp experiment description")
1549            else:
1550                raise service_error(service_error.req, 
1551                        "Only ns2descriptions supported")
1552        else:
1553            raise service_error(service_error.req, "No experiment description")
1554
1555        if req.has_key('experimentID') and \
1556                req['experimentID'].has_key('localname'):
1557            eid = req['experimentID']['localname']
1558            self.state_lock.acquire()
1559            while (self.state.has_key(eid)):
1560                eid += random.choice(string.ascii_letters)
1561            # To avoid another thread picking this localname
1562            self.state[eid] = "placeholder"
1563            self.state_lock.release()
1564        else:
1565            eid = self.exp_stem
1566            for i in range(0,5):
1567                eid += random.choice(string.ascii_letters)
1568            self.state_lock.acquire()
1569            while (self.state.has_key(eid)):
1570                eid = self.exp_stem
1571                for i in range(0,5):
1572                    eid += random.choice(string.ascii_letters)
1573            # To avoid another thread picking this localname
1574            self.state[eid] = "placeholder"
1575            self.state_lock.release()
1576
1577        try:
1578            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1579        except ValueError:
1580            raise service_error(service_error.server_config, 
1581                    "Bad key type (%s)" % self.ssh_type)
1582
1583        user = req.get('user', None)
1584        if user == None:
1585            raise service_error(service_error.req, "No user")
1586
1587        master = req.get('master', None)
1588        if not master:
1589            raise service_error(service_error.req, "No master testbed label")
1590        export_project = req.get('exportProject', None)
1591        if not export_project:
1592            raise service_error(service_error.req, "No export project")
1593       
1594        if self.splitter_url:
1595            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1596            split_data = self.remote_splitter(self.splitter_url, file_content,
1597                    master)
1598        else:
1599            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1600                str(self.muxmax), '-m', master]
1601
1602            if self.fedkit:
1603                tclcmd.append('-k')
1604
1605            tclcmd.extend([pid, gid, eid, tclfile])
1606
1607            self.log.debug("running local splitter %s", " ".join(tclcmd))
1608            tclparser = Popen(tclcmd, stdout=PIPE)
1609            split_data = tclparser.stdout
1610
1611        allocated = { }     # Testbeds we can access
1612        started = { }       # Testbeds where a sub-experiment started
1613                            # successfully
1614
1615        # Objects to parse the splitter output (defined above)
1616        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1617        parse_allbeds = self.allbeds(self.get_access)
1618        parse_gateways = self.gateways(eid, master, tmpdir,
1619                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1620        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1621                    "^#\s+End\s+Vtopo")
1622        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1623                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1624        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1625                "^#\s+End\s+tarfiles")
1626        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1627                "^#\s+End\s+rpms")
1628
1629        try: 
1630            # Working on the split data
1631            for line in split_data:
1632                line = line.rstrip()
1633                if parse_current_testbed(line, master, allocated, tbparams):
1634                    continue
1635                elif parse_allbeds(line, user, tbparams, master, export_project,
1636                        access_user):
1637                    continue
1638                elif parse_gateways(line, allocated, tbparams):
1639                    continue
1640                elif parse_vtopo(line):
1641                    continue
1642                elif parse_hostnames(line):
1643                    continue
1644                elif parse_tarfiles(line):
1645                    continue
1646                elif parse_rpms(line):
1647                    continue
1648                else:
1649                    raise service_error(service_error.internal, 
1650                            "Bad tcl parse? %s" % line)
1651        except service_error, e:
1652            # If something goes wrong in the parse (usually an access error)
1653            # clear the placeholder state
1654            self.state_lock.acquire()
1655            del self.state[eid]
1656            self.state_lock.release()
1657            raise e
1658       
1659        # Virtual topology and visualization
1660        vtopo = self.gentopo(parse_vtopo.str)
1661        if not vtopo:
1662            raise service_error(service_error.internal, 
1663                    "Failed to generate virtual topology")
1664
1665        vis = self.genviz(vtopo)
1666        if not vis:
1667            raise service_error(service_error.internal, 
1668                    "Failed to generate visualization")
1669
1670        # save federant information
1671        for k in allocated.keys():
1672            tbparams[k]['federant'] = {\
1673                    'name': [ { 'localname' : eid} ],\
1674                    'emulab': tbparams[k]['emulab'],\
1675                    'allocID' : tbparams[k]['allocID'],\
1676                    'master' : k == master,\
1677                }
1678
1679
1680        # Copy tarfiles and rpms needed at remote sites into a staging area
1681        try:
1682            if self.fedkit:
1683                parse_tarfiles.list.append(self.fedkit)
1684            for t in parse_tarfiles.list:
1685                if not os.path.exists("%s/tarfiles" % tmpdir):
1686                    os.mkdir("%s/tarfiles" % tmpdir)
1687                self.copy_file(t, "%s/tarfiles/%s" % \
1688                        (tmpdir, os.path.basename(t)))
1689            for r in parse_rpms.list:
1690                if not os.path.exists("%s/rpms" % tmpdir):
1691                    os.mkdir("%s/rpms" % tmpdir)
1692                self.copy_file(r, "%s/rpms/%s" % \
1693                        (tmpdir, os.path.basename(r)))
1694        except IOError, e:
1695            raise service_error(service_error.internal, 
1696                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1697
1698        thread_pool_info = self.thread_pool()
1699        threads = [ ]
1700
1701        for tb in [ k for k in allocated.keys() if k != master]:
1702            # Wait until we have a free slot to start the next testbed load
1703            thread_pool_info.acquire()
1704            while thread_pool_info.started - \
1705                    thread_pool_info.terminated >= self.nthreads:
1706                thread_pool_info.wait()
1707            thread_pool_info.release()
1708
1709            # Create and start a thread to start the segment, and save it to
1710            # get the return value later
1711            t  = self.pooled_thread(target=self.start_segment, 
1712                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1713                    pdata=thread_pool_info, trace_file=self.trace_file)
1714            threads.append(t)
1715            t.start()
1716
1717        # Wait until all finish (the first clause of the while is to make sure
1718        # one starts)
1719        thread_pool_info.acquire()
1720        while thread_pool_info.started == 0 or \
1721                thread_pool_info.started > thread_pool_info.terminated:
1722            thread_pool_info.wait()
1723        thread_pool_info.release()
1724
1725        # If none failed, start the master
1726        failed = [ t.getName() for t in threads if not t.rv ]
1727
1728        if len(failed) == 0:
1729            if not self.start_segment(master, eid, tbparams, tmpdir):
1730                failed.append(master)
1731
1732        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1733        # If one failed clean up, unless fail_soft is set
1734        if failed:
1735            if not fail_soft:
1736                for tb in succeeded:
1737                    self.stop_segment(tb, eid, tbparams)
1738                # Remove the placeholder
1739                self.state_lock.acquire()
1740                del self.state[eid]
1741                self.state_lock.release()
1742
1743                raise service_error(service_error.federant,
1744                    "Swap in failed on %s" % ",".join(failed))
1745        else:
1746            self.log.info("[start_segment]: Experiment %s started" % eid)
1747
1748        # Generate an ID for the experiment (slice) and a certificate that the
1749        # allocator can use to prove they own it.  We'll ship it back through
1750        # the encrypted connection.
1751        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1752
1753        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1754
1755        # Walk up tmpdir, deleting as we go
1756        for path, dirs, files in os.walk(tmpdir, topdown=False):
1757            for f in files:
1758                os.remove(os.path.join(path, f))
1759            for d in dirs:
1760                os.rmdir(os.path.join(path, d))
1761        os.rmdir(tmpdir)
1762
1763        # The deepcopy prevents the allocation ID and other binaries from being
1764        # translated into other formats
1765        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1766                for tb in tbparams.keys() \
1767                    if tbparams[tb].has_key('federant') ],\
1768                    'vtopo': vtopo,\
1769                    'vis' : vis,
1770                    'experimentID' : [\
1771                            { 'fedid': copy.copy(expid) }, \
1772                            { 'localname': eid },\
1773                        ],\
1774                    'experimentAccess': { 'X509' : expcert },\
1775                }
1776
1777        # Insert the experiment into our state and update the disk copy
1778        self.state_lock.acquire()
1779        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1780                for tb in tbparams.keys() \
1781                    if tbparams[tb].has_key('federant') ],\
1782                    'vtopo': vtopo,\
1783                    'vis' : vis,
1784                    'owner': fid,
1785                    'experimentID' : [\
1786                            { 'fedid': expid }, { 'localname': eid },\
1787                        ],\
1788                }
1789        self.state[eid] = self.state[expid]
1790        if self.state_filename: self.write_state()
1791        self.state_lock.release()
1792
1793        self.auth.set_attribute(fid, expid)
1794        self.auth.set_attribute(expid, expid)
1795
1796        if not failed:
1797            return resp
1798        else:
1799            raise service_error(service_error.partial, \
1800                    "Partial swap in on %s" % ",".join(succeeded))
1801
1802    def check_experiment_access(self, fid, key):
1803        """
1804        Confirm that the fid has access to the experiment.  Though a request
1805        may be made in terms of a local name, the access attribute is always
1806        the experiment's fedid.
1807        """
1808        if not isinstance(key, fedid):
1809            self.state_lock.acquire()
1810            if self.state.has_key(key):
1811                try:
1812                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1813                            if f.has_key('fedid') ]
1814                except KeyError:
1815                    self.state_lock.release()
1816                    raise service_error(service_error.internal, 
1817                            "No fedid for experiment %s when checking " +\
1818                                    "access(!?)" % key)
1819                if len(kl) == 1:
1820                    key = kl[0]
1821                else:
1822                    self.state_lock.release()
1823                    raise service_error(service_error.internal, 
1824                            "multiple fedids for experiment %s when " +\
1825                                    "checking access(!?)" % key)
1826            else:
1827                self.state_lock.release()
1828                raise service_error(service_error.access, "Access Denied")
1829            self.state_lock.release()
1830
1831        if self.auth.check_attribute(fid, key):
1832            return True
1833        else:
1834            raise service_error(service_error.access, "Access Denied")
1835
1836
1837
1838    def get_vtopo(self, req, fid):
1839        """
1840        Return the stored virtual topology for this experiment
1841        """
1842        rv = None
1843
1844        req = req.get('VtopoRequestBody', None)
1845        if not req:
1846            raise service_error(service_error.req,
1847                    "Bad request format (no VtopoRequestBody)")
1848        exp = req.get('experiment', None)
1849        if exp:
1850            if exp.has_key('fedid'):
1851                key = exp['fedid']
1852                keytype = "fedid"
1853            elif exp.has_key('localname'):
1854                key = exp['localname']
1855                keytype = "localname"
1856            else:
1857                raise service_error(service_error.req, "Unknown lookup type")
1858        else:
1859            raise service_error(service_error.req, "No request?")
1860
1861        self.check_experiment_access(fid, key)
1862
1863        self.state_lock.acquire()
1864        if self.state.has_key(key):
1865            rv = { 'experiment' : {keytype: key },\
1866                    'vtopo': self.state[key]['vtopo'],\
1867                }
1868        self.state_lock.release()
1869
1870        if rv: return rv
1871        else: raise service_error(service_error.req, "No such experiment")
1872
1873    def get_vis(self, req, fid):
1874        """
1875        Return the stored visualization for this experiment
1876        """
1877        rv = None
1878
1879        req = req.get('VisRequestBody', None)
1880        if not req:
1881            raise service_error(service_error.req,
1882                    "Bad request format (no VisRequestBody)")
1883        exp = req.get('experiment', None)
1884        if exp:
1885            if exp.has_key('fedid'):
1886                key = exp['fedid']
1887                keytype = "fedid"
1888            elif exp.has_key('localname'):
1889                key = exp['localname']
1890                keytype = "localname"
1891            else:
1892                raise service_error(service_error.req, "Unknown lookup type")
1893        else:
1894            raise service_error(service_error.req, "No request?")
1895
1896        self.check_experiment_access(fid, key)
1897
1898        self.state_lock.acquire()
1899        if self.state.has_key(key):
1900            rv =  { 'experiment' : {keytype: key },\
1901                    'vis': self.state[key]['vis'],\
1902                    }
1903        self.state_lock.release()
1904
1905        if rv: return rv
1906        else: raise service_error(service_error.req, "No such experiment")
1907
1908    def get_info(self, req, fid):
1909        """
1910        Return all the stored info about this experiment
1911        """
1912        rv = None
1913
1914        req = req.get('InfoRequestBody', None)
1915        if not req:
1916            raise service_error(service_error.req,
1917                    "Bad request format (no VisRequestBody)")
1918        exp = req.get('experiment', None)
1919        if exp:
1920            if exp.has_key('fedid'):
1921                key = exp['fedid']
1922                keytype = "fedid"
1923            elif exp.has_key('localname'):
1924                key = exp['localname']
1925                keytype = "localname"
1926            else:
1927                raise service_error(service_error.req, "Unknown lookup type")
1928        else:
1929            raise service_error(service_error.req, "No request?")
1930
1931        self.check_experiment_access(fid, key)
1932
1933        # The state may be massaged by the service function that called
1934        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1935        # state.
1936        self.state_lock.acquire()
1937        if self.state.has_key(key):
1938            rv = copy.deepcopy(self.state[key])
1939        self.state_lock.release()
1940
1941        if rv: return rv
1942        else: raise service_error(service_error.req, "No such experiment")
1943
1944
1945    def terminate_experiment(self, req, fid):
1946        """
1947        Swap this experiment out on the federants and delete the shared
1948        information
1949        """
1950        tbparams = { }
1951        req = req.get('TerminateRequestBody', None)
1952        if not req:
1953            raise service_error(service_error.req,
1954                    "Bad request format (no TerminateRequestBody)")
1955        exp = req.get('experiment', None)
1956        if exp:
1957            if exp.has_key('fedid'):
1958                key = exp['fedid']
1959                keytype = "fedid"
1960            elif exp.has_key('localname'):
1961                key = exp['localname']
1962                keytype = "localname"
1963            else:
1964                raise service_error(service_error.req, "Unknown lookup type")
1965        else:
1966            raise service_error(service_error.req, "No request?")
1967
1968        self.check_experiment_access(fid, key)
1969
1970        self.state_lock.acquire()
1971        fed_exp = self.state.get(key, None)
1972
1973        if fed_exp:
1974            # This branch of the conditional holds the lock to generate a
1975            # consistent temporary tbparams variable to deallocate experiments.
1976            # It releases the lock to do the deallocations and reacquires it to
1977            # remove the experiment state when the termination is complete.
1978            ids = []
1979            #  experimentID is a list of dicts that are self-describing
1980            #  identifiers.  This finds all the fedids and localnames - the
1981            #  keys of self.state - and puts them into ids.
1982            for id in fed_exp.get('experimentID', []):
1983                if id.has_key('fedid'): ids.append(id['fedid'])
1984                if id.has_key('localname'): ids.append(id['localname'])
1985
1986            # Construct enough of the tbparams to make the stop_segment calls
1987            # work
1988            for fed in fed_exp['federant']:
1989                try:
1990                    for e in fed['name']:
1991                        eid = e.get('localname', None)
1992                        if eid: break
1993                    else:
1994                        continue
1995
1996                    p = fed['emulab']['project']
1997
1998                    project = p['name']['localname']
1999                    tb = p['testbed']['localname']
2000                    user = p['user'][0]['userID']['localname']
2001
2002                    domain = fed['emulab']['domain']
2003                    host  = "%s%s" % (fed['emulab']['ops'], domain)
2004                    aid = fed['allocID']
2005                except KeyError, e:
2006                    continue
2007                tbparams[tb] = {\
2008                        'user': user,\
2009                        'domain': domain,\
2010                        'project': project,\
2011                        'host': host,\
2012                        'eid': eid,\
2013                        'aid': aid,\
2014                    }
2015            self.state_lock.release()
2016
2017            # Stop everyone.
2018            for tb in tbparams.keys():
2019                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
2020
2021            # release the allocations
2022            for tb in tbparams.keys():
2023                self.release_access(tb, tbparams[tb]['aid'])
2024
2025            # Remove the terminated experiment
2026            self.state_lock.acquire()
2027            for id in ids:
2028                if self.state.has_key(id): del self.state[id]
2029
2030            if self.state_filename: self.write_state()
2031            self.state_lock.release()
2032
2033            return { 'experiment': exp }
2034        else:
2035            # Don't forget to release the lock
2036            self.state_lock.release()
2037            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.