source: fedd/fedd/experiment_control.py @ ec4fb42

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since ec4fb42 was ec4fb42, checked in by Ted Faber <faber@…>, 15 years ago

Clean up some names that start with fedd_ that are ugly with the new package
structure. A couple other bugs cleaned up, too.

  • Property mode set to 100644
File size: 60.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from fedd_services import *
22from fedd_internal_services import *
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26import parse_detail
27from service_error import service_error
28
29
30class nullHandler(logging.Handler):
31    def emit(self, record): pass
32
33fl = logging.getLogger("fedd.experiment_control")
34fl.addHandler(nullHandler())
35
36class experiment_control_local:
37    """
38    Control of experiments that this system can directly access.
39
40    Includes experiment creation, termination and information dissemination.
41    Thred safe.
42    """
43   
44    class thread_pool:
45        """
46        A class to keep track of a set of threads all invoked for the same
47        task.  Manages the mutual exclusion of the states.
48        """
49        def __init__(self):
50            """
51            Start a pool.
52            """
53            self.changed = Condition()
54            self.started = 0
55            self.terminated = 0
56
57        def acquire(self):
58            """
59            Get the pool's lock.
60            """
61            self.changed.acquire()
62
63        def release(self):
64            """
65            Release the pool's lock.
66            """
67            self.changed.release()
68
69        def wait(self, timeout = None):
70            """
71            Wait for a pool thread to start or stop.
72            """
73            self.changed.wait(timeout)
74
75        def start(self):
76            """
77            Called by a pool thread to report starting.
78            """
79            self.changed.acquire()
80            self.started += 1
81            self.changed.notifyAll()
82            self.changed.release()
83
84        def terminate(self):
85            """
86            Called by a pool thread to report finishing.
87            """
88            self.changed.acquire()
89            self.terminated += 1
90            self.changed.notifyAll()
91            self.changed.release()
92
93        def clear(self):
94            """
95            Clear all pool data.
96            """
97            self.changed.acquire()
98            self.started = 0
99            self.terminated =0
100            self.changed.notifyAll()
101            self.changed.release()
102
103    class pooled_thread(Thread):
104        """
105        One of a set of threads dedicated to a specific task.  Uses the
106        thread_pool class above for coordination.
107        """
108        def __init__(self, group=None, target=None, name=None, args=(), 
109                kwargs={}, pdata=None, trace_file=None):
110            Thread.__init__(self, group, target, name, args, kwargs)
111            self.rv = None          # Return value of the ops in this thread
112            self.exception = None   # Exception that terminated this thread
113            self.target=target      # Target function to run on start()
114            self.args = args        # Args to pass to target
115            self.kwargs = kwargs    # Additional kw args
116            self.pdata = pdata      # thread_pool for this class
117            # Logger for this thread
118            self.log = logging.getLogger("fedd.experiment_control")
119       
120        def run(self):
121            """
122            Emulate Thread.run, except add pool data manipulation and error
123            logging.
124            """
125            if self.pdata:
126                self.pdata.start()
127
128            if self.target:
129                try:
130                    self.rv = self.target(*self.args, **self.kwargs)
131                except service_error, s:
132                    self.exception = s
133                    self.log.error("Thread exception: %s %s" % \
134                            (s.code_string(), s.desc))
135                except:
136                    self.exception = sys.exc_info()[1]
137                    self.log.error(("Unexpected thread exception: %s" +\
138                            "Trace %s") % (self.exception,\
139                                traceback.format_exc()))
140            if self.pdata:
141                self.pdata.terminate()
142
143    call_RequestAccess = service_caller('RequestAccess',
144            'getfeddPortType', feddServiceLocator, 
145            RequestAccessRequestMessage, 'RequestAccessRequestBody')
146
147    call_ReleaseAccess = service_caller('ReleaseAccess',
148            'getfeddPortType', feddServiceLocator, 
149            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
150
151    call_Ns2Split = service_caller('Ns2Split',
152            'getfeddInternalPortType', feddInternalServiceLocator, 
153            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
154
155    def __init__(self, config=None, auth=None):
156        """
157        Intialize the various attributes, most from the config object
158        """
159        self.thread_with_rv = experiment_control_local.pooled_thread
160        self.thread_pool = experiment_control_local.thread_pool
161
162        self.cert_file = None
163        self.cert_pwd = None
164        self.trusted_certs = None
165
166        # Walk through the various relevant certificat specifying config
167        # attributes until the local certificate attributes can be resolved.
168        # The walk is from most specific to most general specification.
169        for s in ("experiment_control", "globals"):
170            if config.has_section(s): 
171                if config.has_option(s, "cert_file"):
172                    if not self.cert_file:
173                        self.cert_file = config.get(s, "cert_file")
174                        self.cert_pwd = config.get(s, "cert_pwd")
175
176                if config.has_option(s, "trusted_certs"):
177                    if not self.trusted_certs:
178                        self.trusted_certs = config.get(s, "trusted_certs")
179
180
181        self.exp_stem = "fed-stem"
182        self.log = logging.getLogger("fedd.experiment_control")
183        set_log_level(config, "experiment_control", self.log)
184        self.muxmax = 2
185        self.nthreads = 2
186        self.randomize_experiments = False
187
188        self.scp_exec = "/usr/bin/scp"
189        self.splitter = None
190        self.ssh_exec="/usr/bin/ssh"
191        self.ssh_keygen = "/usr/bin/ssh-keygen"
192        self.ssh_identity_file = None
193
194
195        self.debug = config.getboolean("experiment_control", "create_debug")
196        self.state_filename = config.get("experiment_control", 
197                "experiment_state_file")
198        self.splitter_url = config.get("experiment_control", "splitter_url")
199        self.fedkit = config.get("experiment_control", "fedkit")
200        accessdb_file = config.get("experiment_control", "accessdb")
201
202        self.ssh_pubkey_file = config.get("experiment_control", 
203                "ssh_pubkey_file")
204        self.ssh_privkey_file = config.get("experiment_control",
205                "ssh_privkey_file")
206        # NB for internal master/slave ops, not experiment setup
207        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
208        self.state = { }
209        self.state_lock = Lock()
210        self.tclsh = "/usr/local/bin/otclsh"
211        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
212        mapdb_file = config.get("experiment_control", "mapdb")
213        self.trace_file = sys.stderr
214
215        self.def_expstart = \
216                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
217                "/tmp/federate";
218        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
219                "FEDDIR/hosts";
220        self.def_gwstart = \
221                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
222                "/tmp/bridge.log";
223        self.def_mgwstart = \
224                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
225                "/tmp/bridge.log";
226        self.def_gwimage = "FBSD61-TUNNEL2";
227        self.def_gwtype = "pc";
228
229        if auth:
230            self.auth = auth
231        else:
232            self.log.error(\
233                    "[access]: No authorizer initialized, creating local one.")
234            auth = authorizer()
235
236
237        if self.ssh_pubkey_file:
238            try:
239                f = open(self.ssh_pubkey_file, 'r')
240                self.ssh_pubkey = f.read()
241                f.close()
242            except IOError:
243                raise service_error(service_error.internal,
244                        "Cannot read sshpubkey")
245        else:
246            raise service_error(service_error.internal, 
247                    "No SSH public key file?")
248
249        if not self.ssh_privkey_file:
250            raise service_error(service_error.internal, 
251                    "No SSH public key file?")
252
253
254        if mapdb_file:
255            self.read_mapdb(mapdb_file)
256        else:
257            self.log.warn("[experiment_control] No testbed map, using defaults")
258            self.tbmap = { 
259                    'deter':'https://users.isi.deterlab.net:23235',
260                    'emulab':'https://users.isi.deterlab.net:23236',
261                    'ucb':'https://users.isi.deterlab.net:23237',
262                    }
263
264        if accessdb_file:
265                self.read_accessdb(accessdb_file)
266        else:
267            raise service_error(service_error.internal,
268                    "No accessdb specified in config")
269
270        # Grab saved state.  OK to do this w/o locking because it's read only
271        # and only one thread should be in existence that can see self.state at
272        # this point.
273        if self.state_filename:
274            self.read_state()
275
276        # Dispatch tables
277        self.soap_services = {\
278                'Create': soap_handler(\
279                        CreateRequestMessage.typecode,
280                        getattr(self, "create_experiment"), 
281                        CreateResponseMessage,
282                        "CreateResponseBody"),
283                'Vtopo': soap_handler(\
284                        VtopoRequestMessage.typecode,
285                        getattr(self, "get_vtopo"),
286                        VtopoResponseMessage,
287                        "VtopoResponseBody"),
288                'Vis': soap_handler(\
289                        VisRequestMessage.typecode,
290                        getattr(self, "get_vis"),
291                        VisResponseMessage,
292                        "VisResponseBody"),
293                'Info': soap_handler(\
294                        InfoRequestMessage.typecode,
295                        getattr(self, "get_info"),
296                        InfoResponseMessage,
297                        "InfoResponseBody"),
298                'Terminate': soap_handler(\
299                        TerminateRequestMessage.typecode,
300                        getattr(self, "terminate_experiment"),
301                        TerminateResponseMessage,
302                        "TerminateResponseBody"),
303        }
304
305        self.xmlrpc_services = {\
306                'Create': xmlrpc_handler(\
307                        getattr(self, "create_experiment"), 
308                        "CreateResponseBody"),
309                'Vtopo': xmlrpc_handler(\
310                        getattr(self, "get_vtopo"),
311                        "VtopoResponseBody"),
312                'Vis': xmlrpc_handler(\
313                        getattr(self, "get_vis"),
314                        "VisResponseBody"),
315                'Info': xmlrpc_handler(\
316                        getattr(self, "get_info"),
317                        "InfoResponseBody"),
318                'Terminate': xmlrpc_handler(\
319                        getattr(self, "terminate_experiment"),
320                        "TerminateResponseBody"),
321        }
322
323    def copy_file(self, src, dest, size=1024):
324        """
325        Exceedingly simple file copy.
326        """
327        s = open(src,'r')
328        d = open(dest, 'w')
329
330        buf = "x"
331        while buf != "":
332            buf = s.read(size)
333            d.write(buf)
334        s.close()
335        d.close()
336
337    # Call while holding self.state_lock
338    def write_state(self):
339        """
340        Write a new copy of experiment state after copying the existing state
341        to a backup.
342
343        State format is a simple pickling of the state dictionary.
344        """
345        if os.access(self.state_filename, os.W_OK):
346            self.copy_file(self.state_filename, \
347                    "%s.bak" % self.state_filename)
348        try:
349            f = open(self.state_filename, 'w')
350            pickle.dump(self.state, f)
351        except IOError, e:
352            self.log.error("Can't write file %s: %s" % \
353                    (self.state_filename, e))
354        except pickle.PicklingError, e:
355            self.log.error("Pickling problem: %s" % e)
356        except TypeError, e:
357            self.log.error("Pickling problem (TypeError): %s" % e)
358
359    # Call while holding self.state_lock
360    def read_state(self):
361        """
362        Read a new copy of experiment state.  Old state is overwritten.
363
364        State format is a simple pickling of the state dictionary.
365        """
366        try:
367            f = open(self.state_filename, "r")
368            self.state = pickle.load(f)
369            self.log.debug("[read_state]: Read state from %s" % \
370                    self.state_filename)
371        except IOError, e:
372            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
373                    % (self.state_filename, e))
374        except pickle.UnpicklingError, e:
375            self.log.warning(("[read_state]: No saved state: " + \
376                    "Unpickling failed: %s") % e)
377       
378        for k in self.state.keys():
379            try:
380                # This list should only have one element in it, but phrasing it
381                # as a for loop doesn't cost much, really.  We have to find the
382                # fedid elements anyway.
383                for eid in [ f['fedid'] \
384                        for f in self.state[k]['experimentID']\
385                            if f.has_key('fedid') ]:
386                    self.auth.set_attribute(self.state[k]['owner'], eid)
387            except KeyError, e:
388                self.log.warning("[read_state]: State ownership or identity " +\
389                        "misformatted in %s: %s" % (self.state_filename, e))
390
391
392    def read_accessdb(self, accessdb_file):
393        """
394        Read the mapping from fedids that can create experiments to their name
395        in the 3-level access namespace.  All will be asserted from this
396        testbed and can include the local username and porject that will be
397        asserted on their behalf by this fedd.  Each fedid is also added to the
398        authorization system with the "create" attribute.
399        """
400        self.accessdb = {}
401        # These are the regexps for parsing the db
402        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
403        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
404                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
405        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
406                "\s*->\s*(" + name_expr + ")\s*$")
407        lineno = 0
408
409        # Parse the mappings and store in self.authdb, a dict of
410        # fedid -> (proj, user)
411        try:
412            f = open(accessdb_file, "r")
413            for line in f:
414                lineno += 1
415                line = line.strip()
416                if len(line) == 0 or line.startswith('#'):
417                    continue
418                m = project_line.match(line)
419                if m:
420                    fid = fedid(hexstr=m.group(1))
421                    project, user = m.group(2,3)
422                    if not self.accessdb.has_key(fid):
423                        self.accessdb[fid] = []
424                    self.accessdb[fid].append((project, user))
425                    continue
426
427                m = user_line.match(line)
428                if m:
429                    fid = fedid(hexstr=m.group(1))
430                    project = None
431                    user = m.group(2)
432                    if not self.accessdb.has_key(fid):
433                        self.accessdb[fid] = []
434                    self.accessdb[fid].append((project, user))
435                    continue
436                self.log.warn("[experiment_control] Error parsing access " +\
437                        "db %s at line %d" %  (accessdb_file, lineno))
438        except IOError:
439            raise service_error(service_error.internal,
440                    "Error opening/reading %s as experiment " +\
441                            "control accessdb" %  accessdb_file)
442        f.close()
443
444        # Initialize the authorization attributes
445        for fid in self.accessdb.keys():
446            self.auth.set_attribute(fid, 'create')
447
448    def read_mapdb(self, file):
449        """
450        Read a simple colon separated list of mappings for the
451        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
452        """
453
454        self.tbmap = { }
455        lineno =0
456        try:
457            f = open(file, "r")
458            for line in f:
459                lineno += 1
460                line = line.strip()
461                if line.startswith('#') or len(line) == 0:
462                    continue
463                try:
464                    label, url = line.split(':', 1)
465                    self.tbmap[label] = url
466                except ValueError, e:
467                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
468                            "map db: %s %s" % (lineno, line, e))
469        except IOError, e:
470            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
471                    "open %s: %s" % (file, e))
472        f.close()
473
474    def scp_file(self, file, user, host, dest=""):
475        """
476        scp a file to the remote host.  If debug is set the action is only
477        logged.
478        """
479
480        scp_cmd = [self.scp_exec, '-i', self.ssh_privkey_file, file, 
481                "%s@%s:%s" % (user, host, dest)]
482        rv = 0
483
484        try:
485            dnull = open("/dev/null", "r")
486        except IOError:
487            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
488            dnull = Null
489
490        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
491        if not self.debug:
492            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
493            else: rv = call(scp_cmd)
494
495        return rv == 0
496
497    def ssh_cmd(self, user, host, cmd, wname=None):
498        """
499        Run a remote command on host as user.  If debug is set, the action is
500        only logged.
501        """
502        sh_str = "%s -i %s %s@%s %s" % (self.ssh_exec, self.ssh_privkey_file, 
503                user, host, cmd)
504
505        try:
506            dnull = open("/dev/null", "r")
507        except IOError:
508            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
509            dnull = Null
510
511        self.log.debug("[ssh_cmd]: %s" % sh_str)
512        if not self.debug:
513            if dnull:
514                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
515            else:
516                sub = Popen(sh_str, shell=True)
517            return sub.wait() == 0
518        else:
519            return True
520
521    def ship_configs(self, host, user, src_dir, dest_dir):
522        """
523        Copy federant-specific configuration files to the federant.
524        """
525        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
526            return False
527        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
528            return False
529
530        for f in os.listdir(src_dir):
531            if os.path.isdir(f):
532                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
533                        "%s/%s" % (dest_dir, f)):
534                    return False
535            else:
536                if not self.scp_file("%s/%s" % (src_dir, f), 
537                        user, host, dest_dir):
538                    return False
539        return True
540
541    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
542        """
543        Start a sub-experiment on a federant.
544
545        Get the current state, modify or create as appropriate, ship data and
546        configs and start the experiment.  There are small ordering differences
547        based on the initial state of the sub-experiment.
548        """
549        # ops node in the federant
550        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
551        user = tbparams[tb]['user']     # federant user
552        pid = tbparams[tb]['project']   # federant project
553        # XXX
554        base_confs = ( "hosts",)
555        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
556        # command to test experiment state
557        expinfo_exec = "/usr/testbed/bin/expinfo" 
558        # Configuration directories on the remote machine
559        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
560        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
561        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
562        # Regular expressions to parse the expinfo response
563        state_re = re.compile("State:\s+(\w+)")
564        no_exp_re = re.compile("^No\s+such\s+experiment")
565        state = None    # Experiment state parsed from expinfo
566        # The expinfo ssh command
567        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
568
569        # Get status
570        self.log.debug("[start_segment]: %s"% " ".join(cmd))
571        dev_null = None
572        try:
573            dev_null = open("/dev/null", "a")
574        except IOError, e:
575            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
576
577        if self.debug:
578            state = 'swapped'
579            rv = 0
580        else:
581            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
582            for line in status.stdout:
583                m = state_re.match(line)
584                if m: state = m.group(1)
585                else:
586                    m = no_exp_re.match(line)
587                    if m: state = "none"
588            rv = status.wait()
589
590        # If the experiment is not present the subcommand returns a non-zero
591        # return value.  If we successfully parsed a "none" outcome, ignore the
592        # return code.
593        if rv != 0 and state != "none":
594            raise service_error(service_error.internal,
595                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
596
597        self.log.debug("[start_segment]: %s: %s" % (tb, state))
598        self.log.info("[start_segment]:transferring experiment to %s" % tb)
599
600        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
601            return False
602        # Clear the federation files
603        if not self.ssh_cmd(user, host, 
604                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
605            return False
606        if not self.ssh_cmd(user, host, 
607                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
608            return False
609        # Clear and create the tarfiles and rpm directories
610        for d in (tarfiles_dir, rpms_dir):
611            if not self.ssh_cmd(user, host, 
612                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
613                return False
614            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
615                    "create tarfiles"):
616                return False
617       
618        if state == 'active':
619            # Remote experiment is active.  Modify it.
620            for f in base_confs:
621                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
622                        "%s/%s" % (proj_dir, f)):
623                    return False
624            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
625                    proj_dir):
626                return False
627            if os.path.isdir("%s/tarfiles" % tmpdir):
628                if not self.ship_configs(host, user,
629                        "%s/tarfiles" % tmpdir, tarfiles_dir):
630                    return False
631            if os.path.isdir("%s/rpms" % tmpdir):
632                if not self.ship_configs(host, user,
633                        "%s/rpms" % tmpdir, tarfiles_dir):
634                    return False
635            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
636            if not self.ssh_cmd(user, host,
637                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
638                            (pid, eid, tclfile), "modexp"):
639                return False
640            return True
641        elif state == "swapped":
642            # Remote experiment swapped out.  Modify it and swap it in.
643            for f in base_confs:
644                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
645                        "%s/%s" % (proj_dir, f)):
646                    return False
647            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
648                    proj_dir):
649                return False
650            if os.path.isdir("%s/tarfiles" % tmpdir):
651                if not self.ship_configs(host, user,
652                        "%s/tarfiles" % tmpdir, tarfiles_dir):
653                    return False
654            if os.path.isdir("%s/rpms" % tmpdir):
655                if not self.ship_configs(host, user,
656                        "%s/rpms" % tmpdir, tarfiles_dir):
657                    return False
658            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
659            if not self.ssh_cmd(user, host,
660                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
661                    "modexp"):
662                return False
663            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
664            if not self.ssh_cmd(user, host,
665                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
666                    "swapexp"):
667                return False
668            return True
669        elif state == "none":
670            # No remote experiment.  Create one.  We do this in 2 steps so we
671            # can put the configuration files and scripts into the new
672            # experiment directories.
673
674            # Tarfiles must be present for creation to work
675            if os.path.isdir("%s/tarfiles" % tmpdir):
676                if not self.ship_configs(host, user,
677                        "%s/tarfiles" % tmpdir, tarfiles_dir):
678                    return False
679            if os.path.isdir("%s/rpms" % tmpdir):
680                if not self.ship_configs(host, user,
681                        "%s/rpms" % tmpdir, tarfiles_dir):
682                    return False
683            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
684            if not self.ssh_cmd(user, host,
685                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
686                            (pid, eid, tclfile), "startexp"):
687                return False
688            # After startexp the per-experiment directories exist
689            for f in base_confs:
690                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
691                        "%s/%s" % (proj_dir, f)):
692                    return False
693            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
694                    proj_dir):
695                return False
696            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
697            if not self.ssh_cmd(user, host,
698                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
699                    "swapexp"):
700                return False
701            return True
702        else:
703            self.log.debug("[start_segment]:unknown state %s" % state)
704            return False
705
706    def stop_segment(self, tb, eid, tbparams):
707        """
708        Stop a sub experiment by calling swapexp on the federant
709        """
710        user = tbparams[tb]['user']
711        host = tbparams[tb]['host']
712        pid = tbparams[tb]['project']
713
714        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
715        return self.ssh_cmd(user, host,
716                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
717
718       
719    def generate_ssh_keys(self, dest, type="rsa" ):
720        """
721        Generate a set of keys for the gateways to use to talk.
722
723        Keys are of type type and are stored in the required dest file.
724        """
725        valid_types = ("rsa", "dsa")
726        t = type.lower();
727        if t not in valid_types: raise ValueError
728        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
729
730        try:
731            trace = open("/dev/null", "w")
732        except IOError:
733            raise service_error(service_error.internal,
734                    "Cannot open /dev/null??");
735
736        # May raise CalledProcessError
737        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
738        rv = call(cmd, stdout=trace, stderr=trace)
739        if rv != 0:
740            raise service_error(service_error.internal, 
741                    "Cannot generate nonce ssh keys.  %s return code %d" \
742                            % (self.ssh_keygen, rv))
743
744    def gentopo(self, str):
745        """
746        Generate the topology dtat structure from the splitter's XML
747        representation of it.
748
749        The topology XML looks like:
750            <experiment>
751                <nodes>
752                    <node><vname></vname><ips>ip1:ip2</ips></node>
753                </nodes>
754                <lans>
755                    <lan>
756                        <vname></vname><vnode></vnode><ip></ip>
757                        <bandwidth></bandwidth><member>node:port</member>
758                    </lan>
759                </lans>
760        """
761        class topo_parse:
762            """
763            Parse the topology XML and create the dats structure.
764            """
765            def __init__(self):
766                # Typing of the subelements for data conversion
767                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
768                self.int_subelements = ( 'bandwidth',)
769                self.float_subelements = ( 'delay',)
770                # The final data structure
771                self.nodes = [ ]
772                self.lans =  [ ]
773                self.topo = { \
774                        'node': self.nodes,\
775                        'lan' : self.lans,\
776                    }
777                self.element = { }  # Current element being created
778                self.chars = ""     # Last text seen
779
780            def end_element(self, name):
781                # After each sub element the contents is added to the current
782                # element or to the appropriate list.
783                if name == 'node':
784                    self.nodes.append(self.element)
785                    self.element = { }
786                elif name == 'lan':
787                    self.lans.append(self.element)
788                    self.element = { }
789                elif name in self.str_subelements:
790                    self.element[name] = self.chars
791                    self.chars = ""
792                elif name in self.int_subelements:
793                    self.element[name] = int(self.chars)
794                    self.chars = ""
795                elif name in self.float_subelements:
796                    self.element[name] = float(self.chars)
797                    self.chars = ""
798
799            def found_chars(self, data):
800                self.chars += data.rstrip()
801
802
803        tp = topo_parse();
804        parser = xml.parsers.expat.ParserCreate()
805        parser.EndElementHandler = tp.end_element
806        parser.CharacterDataHandler = tp.found_chars
807
808        parser.Parse(str)
809
810        return tp.topo
811       
812
813    def genviz(self, topo):
814        """
815        Generate the visualization the virtual topology
816        """
817
818        neato = "/usr/local/bin/neato"
819        # These are used to parse neato output and to create the visualization
820        # file.
821        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
822        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
823                "%s</type></node>"
824
825        try:
826            # Node names
827            nodes = [ n['vname'] for n in topo['node'] ]
828            topo_lans = topo['lan']
829        except KeyError:
830            raise service_error(service_error.internal, "Bad topology")
831
832        lans = { }
833        links = { }
834
835        # Walk through the virtual topology, organizing the connections into
836        # 2-node connections (links) and more-than-2-node connections (lans).
837        # When a lan is created, it's added to the list of nodes (there's a
838        # node in the visualization for the lan).
839        for l in topo_lans:
840            if links.has_key(l['vname']):
841                if len(links[l['vname']]) < 2:
842                    links[l['vname']].append(l['vnode'])
843                else:
844                    nodes.append(l['vname'])
845                    lans[l['vname']] = links[l['vname']]
846                    del links[l['vname']]
847                    lans[l['vname']].append(l['vnode'])
848            elif lans.has_key(l['vname']):
849                lans[l['vname']].append(l['vnode'])
850            else:
851                links[l['vname']] = [ l['vnode'] ]
852
853
854        # Open up a temporary file for dot to turn into a visualization
855        try:
856            df, dotname = tempfile.mkstemp()
857            dotfile = os.fdopen(df, 'w')
858        except IOError:
859            raise service_error(service_error.internal,
860                    "Failed to open file in genviz")
861
862        # Generate a dot/neato input file from the links, nodes and lans
863        try:
864            print >>dotfile, "graph G {"
865            for n in nodes:
866                print >>dotfile, '\t"%s"' % n
867            for l in links.keys():
868                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
869            for l in lans.keys():
870                for n in lans[l]:
871                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
872            print >>dotfile, "}"
873            dotfile.close()
874        except TypeError:
875            raise service_error(service_error.internal,
876                    "Single endpoint link in vtopo")
877        except IOError:
878            raise service_error(service_error.internal, "Cannot write dot file")
879
880        # Use dot to create a visualization
881        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
882                '-Gpack=true', dotname], stdout=PIPE)
883
884        # Translate dot to vis format
885        vis_nodes = [ ]
886        vis = { 'node': vis_nodes }
887        for line in dot.stdout:
888            m = vis_re.match(line)
889            if m:
890                vn = m.group(1)
891                vis_node = {'name': vn, \
892                        'x': float(m.group(2)),\
893                        'y' : float(m.group(3)),\
894                    }
895                if vn in links.keys() or vn in lans.keys():
896                    vis_node['type'] = 'lan'
897                else:
898                    vis_node['type'] = 'node'
899                vis_nodes.append(vis_node)
900        rv = dot.wait()
901
902        os.remove(dotname)
903        if rv == 0 : return vis
904        else: return None
905
906    def get_access(self, tb, nodes, user, tbparam, master, export_project,
907            access_user):
908        """
909        Get access to testbed through fedd and set the parameters for that tb
910        """
911
912        translate_attr = {
913            'slavenodestartcmd': 'expstart',
914            'slaveconnectorstartcmd': 'gwstart',
915            'masternodestartcmd': 'mexpstart',
916            'masterconnectorstartcmd': 'mgwstart',
917            'connectorimage': 'gwimage',
918            'connectortype': 'gwtype',
919            'tunnelcfg': 'tun',
920            'smbshare': 'smbshare',
921        }
922
923        uri = self.tbmap.get(tb, None)
924        if not uri:
925            raise service_error(serice_error.server_config, 
926                    "Unknown testbed: %s" % tb)
927
928        # currently this lumps all users into one service access group
929        service_keys = [ a for u in user \
930                for a in u.get('access', []) \
931                    if a.has_key('sshPubkey')]
932
933        if len(service_keys) == 0:
934            raise service_error(service_error.req, 
935                    "Must have at least one SSH pubkey for services")
936
937
938        for p, u in access_user:
939            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
940                    "to %s") %  ((p or "None"), u, uri))
941
942            if p:
943                # Request with user and project specified
944                req = {\
945                        'destinationTestbed' : { 'uri' : uri },
946                        'project': { 
947                            'name': {'localname': p},
948                            'user': [ {'userID': { 'localname': u } } ],
949                            },
950                        'user':  user,
951                        'allocID' : { 'localname': 'test' },
952                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
953                        'serviceAccess' : service_keys
954                    }
955            else:
956                # Request with only user specified
957                req = {\
958                        'destinationTestbed' : { 'uri' : uri },
959                        'user':  [ {'userID': { 'localname': u } } ],
960                        'allocID' : { 'localname': 'test' },
961                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
962                        'serviceAccess' : service_keys
963                    }
964
965            if tb == master:
966                # NB, the export_project parameter is a dict that includes
967                # the type
968                req['exportProject'] = export_project
969
970            # node resources if any
971            if nodes != None and len(nodes) > 0:
972                rnodes = [ ]
973                for n in nodes:
974                    rn = { }
975                    image, hw, count = n.split(":")
976                    if image: rn['image'] = [ image ]
977                    if hw: rn['hardware'] = [ hw ]
978                    if count: rn['count'] = int(count)
979                    rnodes.append(rn)
980                req['resources']= { }
981                req['resources']['node'] = rnodes
982
983            try:
984                r = self.call_RequestAccess(uri, req, 
985                        self.cert_file, self.cert_pwd, self.trusted_certs)
986            except service_error, e:
987                if e.code == service_error.access:
988                    self.log.debug("[get_access] Access denied")
989                    r = None
990                    continue
991                else:
992                    raise e
993
994            if r.has_key('RequestAccessResponseBody'):
995                # Through to here we have a valid response, not a fault.
996                # Access denied is a fault, so something better or worse than
997                # access denied has happened.
998                r = r['RequestAccessResponseBody']
999                self.log.debug("[get_access] Access granted")
1000                break
1001            else:
1002                raise service_error(service_error.protocol,
1003                        "Bad proxy response")
1004       
1005        if not r:
1006            raise service_error(service_error.access, 
1007                    "Access denied by %s (%s)" % (tb, uri))
1008
1009        e = r['emulab']
1010        p = e['project']
1011        tbparam[tb] = { 
1012                "boss": e['boss'],
1013                "host": e['ops'],
1014                "domain": e['domain'],
1015                "fs": e['fileServer'],
1016                "eventserver": e['eventServer'],
1017                "project": unpack_id(p['name']),
1018                "emulab" : e,
1019                "allocID" : r['allocID'],
1020                }
1021        # Make the testbed name be the label the user applied
1022        p['testbed'] = {'localname': tb }
1023
1024        for u in p['user']:
1025            tbparam[tb]['user'] = unpack_id(u['userID'])
1026
1027        for a in e['fedAttr']:
1028            if a['attribute']:
1029                key = translate_attr.get(a['attribute'].lower(), None)
1030                if key:
1031                    tbparam[tb][key]= a['value']
1032       
1033    def release_access(self, tb, aid):
1034        """
1035        Release access to testbed through fedd
1036        """
1037
1038        uri = self.tbmap.get(tb, None)
1039        if not uri:
1040            raise service_error(serice_error.server_config, 
1041                    "Unknown testbed: %s" % tb)
1042
1043        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1044                self.cert_file, self.cert_pwd, self.trusted_certs)
1045
1046        # better error coding
1047
1048    def remote_splitter(self, uri, desc, master):
1049
1050        req = {
1051                'description' : { 'ns2description': desc },
1052                'master': master,
1053                'include_fedkit': bool(self.fedkit)
1054            }
1055
1056        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1057                self.trusted_certs)
1058
1059        if r.has_key('Ns2SplitResponseBody'):
1060            r = r['Ns2SplitResponseBody']
1061            if r.has_key('output'):
1062                return r['output'].splitlines()
1063            else:
1064                raise service_error(service_error.protocol, 
1065                        "Bad splitter response (no output)")
1066        else:
1067            raise service_error(service_error.protocol, "Bad splitter response")
1068       
1069    class current_testbed:
1070        """
1071        Object for collecting the current testbed description.  The testbed
1072        description is saved to a file with the local testbed variables
1073        subsittuted line by line.
1074        """
1075        def __init__(self, eid, tmpdir, fedkit):
1076            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1077            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1078            self.current_testbed = None
1079            self.testbed_file = None
1080
1081            self.def_expstart = \
1082                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1083            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1084            self.def_gwstart = \
1085                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1086            self.def_mgwstart = \
1087                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1088            self.def_gwimage = "FBSD61-TUNNEL2";
1089            self.def_gwtype = "pc";
1090
1091            self.eid = eid
1092            self.tmpdir = tmpdir
1093            self.fedkit = fedkit
1094
1095        def __call__(self, line, master, allocated, tbparams):
1096            # Capture testbed topology descriptions
1097            if self.current_testbed == None:
1098                m = self.begin_testbed.match(line)
1099                if m != None:
1100                    self.current_testbed = m.group(1)
1101                    if self.current_testbed == None:
1102                        raise service_error(service_error.req,
1103                                "Bad request format (unnamed testbed)")
1104                    allocated[self.current_testbed] = \
1105                            allocated.get(self.current_testbed,0) + 1
1106                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1107                    if not os.path.exists(tb_dir):
1108                        try:
1109                            os.mkdir(tb_dir)
1110                        except IOError:
1111                            raise service_error(service_error.internal,
1112                                    "Cannot create %s" % tb_dir)
1113                    try:
1114                        self.testbed_file = open("%s/%s.%s.tcl" %
1115                                (tb_dir, self.eid, self.current_testbed), 'w')
1116                    except IOError:
1117                        self.testbed_file = None
1118                    return True
1119                else: return False
1120            else:
1121                m = self.end_testbed.match(line)
1122                if m != None:
1123                    if m.group(1) != self.current_testbed:
1124                        raise service_error(service_error.internal, 
1125                                "Mismatched testbed markers!?")
1126                    if self.testbed_file != None: 
1127                        self.testbed_file.close()
1128                        self.testbed_file = None
1129                    self.current_testbed = None
1130                elif self.testbed_file:
1131                    # Substitute variables and put the line into the local
1132                    # testbed file.
1133                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1134                            self.def_gwtype)
1135                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1136                            self.def_gwimage)
1137                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1138                            self.def_mgwstart)
1139                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1140                            self.def_mexpstart)
1141                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1142                            self.def_gwstart)
1143                    expstart = tbparams[self.current_testbed].get('expstart', 
1144                            self.def_expstart)
1145                    project = tbparams[self.current_testbed].get('project')
1146                    line = re.sub("GWTYPE", gwtype, line)
1147                    line = re.sub("GWIMAGE", gwimage, line)
1148                    if self.current_testbed == master:
1149                        line = re.sub("GWSTART", mgwstart, line)
1150                        line = re.sub("EXPSTART", mexpstart, line)
1151                    else:
1152                        line = re.sub("GWSTART", gwstart, line)
1153                        line = re.sub("EXPSTART", expstart, line)
1154                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1155                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1156                    line = re.sub("EID", self.eid, line)
1157                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1158                            (project, self.eid), line)
1159                    if self.fedkit:
1160                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1161                                line)
1162                    print >>self.testbed_file, line
1163                return True
1164
1165    class allbeds:
1166        """
1167        Process the Allbeds section.  Get access to each federant and save the
1168        parameters in tbparams
1169        """
1170        def __init__(self, get_access):
1171            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1172            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1173            self.in_allbeds = False
1174            self.get_access = get_access
1175
1176        def __call__(self, line, user, tbparams, master, export_project,
1177                access_user):
1178            # Testbed access parameters
1179            if not self.in_allbeds:
1180                if self.begin_allbeds.match(line):
1181                    self.in_allbeds = True
1182                    return True
1183                else:
1184                    return False
1185            else:
1186                if self.end_allbeds.match(line):
1187                    self.in_allbeds = False
1188                else:
1189                    nodes = line.split('|')
1190                    tb = nodes.pop(0)
1191                    self.get_access(tb, nodes, user, tbparams, master,
1192                            export_project, access_user)
1193                return True
1194
1195    class gateways:
1196        def __init__(self, eid, master, tmpdir, gw_pubkey,
1197                gw_secretkey, copy_file, fedkit):
1198            self.begin_gateways = \
1199                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1200            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1201            self.current_gateways = None
1202            self.control_gateway = None
1203            self.active_end = { }
1204
1205            self.eid = eid
1206            self.master = master
1207            self.tmpdir = tmpdir
1208            self.gw_pubkey_base = gw_pubkey
1209            self.gw_secretkey_base = gw_secretkey
1210
1211            self.copy_file = copy_file
1212            self.fedkit = fedkit
1213
1214
1215        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1216                active_end, tbparams, dtb, myname, desthost, type):
1217            """
1218            Produce a gateway configuration file from a gateways line.
1219            """
1220
1221            sproject = tbparams[gw].get('project', 'project')
1222            dproject = tbparams[dtb].get('project', 'project')
1223            sdomain = ".%s.%s%s" % (eid, sproject,
1224                    tbparams[gw].get('domain', ".example.com"))
1225            ddomain = ".%s.%s%s" % (eid, dproject,
1226                    tbparams[dtb].get('domain', ".example.com"))
1227            boss = tbparams[master].get('boss', "boss")
1228            fs = tbparams[master].get('fs', "fs")
1229            event_server = "%s%s" % \
1230                    (tbparams[gw].get('eventserver', "event_server"),
1231                            tbparams[gw].get('domain', "example.com"))
1232            remote_event_server = "%s%s" % \
1233                    (tbparams[dtb].get('eventserver', "event_server"),
1234                            tbparams[dtb].get('domain', "example.com"))
1235            seer_control = "%s%s" % \
1236                    (tbparams[gw].get('control', "control"), sdomain)
1237
1238            if self.fedkit:
1239                remote_script_dir = "/usr/local/federation/bin"
1240                local_script_dir = "/usr/local/federation/bin"
1241            else:
1242                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1243                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1244
1245            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1246            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1247            tunnel_cfg = tbparams[gw].get("tun", "false")
1248
1249            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1250            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1251
1252            # translate to lower case so the `hostname` hack for specifying
1253            # configuration files works.
1254            conf_file = conf_file.lower();
1255            remote_conf_file = remote_conf_file.lower();
1256
1257            if dtb == master:
1258                active = "false"
1259            elif gw == master:
1260                active = "true"
1261            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1262                active = "false"
1263            else:
1264                active_end['%s-%s' % (gw, dtb)] = 1
1265                active = "true"
1266
1267            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1268            print >>gwconfig, "Active: %s" % active
1269            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1270            print >>gwconfig, "BossName: %s" % boss
1271            print >>gwconfig, "FsName: %s" % fs
1272            print >>gwconfig, "EventServerName: %s" % event_server
1273            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1274            print >>gwconfig, "SeerControl: %s" % seer_control
1275            print >>gwconfig, "Type: %s" % type
1276            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1277            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1278                    local_script_dir
1279            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1280            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1281            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1282                    (remote_conf_dir, remote_conf_file)
1283            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1284            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1285            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1286            gwconfig.close()
1287
1288            return active == "true"
1289
1290        def __call__(self, line, allocated, tbparams):
1291            # Process gateways
1292            if not self.current_gateways:
1293                m = self.begin_gateways.match(line)
1294                if m:
1295                    self.current_gateways = m.group(1)
1296                    if allocated.has_key(self.current_gateways):
1297                        # This test should always succeed
1298                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1299                        if not os.path.exists(tb_dir):
1300                            try:
1301                                os.mkdir(tb_dir)
1302                            except IOError:
1303                                raise service_error(service_error.internal,
1304                                        "Cannot create %s" % tb_dir)
1305                    else:
1306                        # XXX
1307                        self.log.error("[gateways]: Ignoring gateways for " + \
1308                                "unknown testbed %s" % self.current_gateways)
1309                        self.current_gateways = None
1310                    return True
1311                else:
1312                    return False
1313            else:
1314                m = self.end_gateways.match(line)
1315                if m :
1316                    if m.group(1) != self.current_gateways:
1317                        raise service_error(service_error.internal,
1318                                "Mismatched gateway markers!?")
1319                    if self.control_gateway:
1320                        try:
1321                            cc = open("%s/%s/client.conf" %
1322                                    (self.tmpdir, self.current_gateways), 'w')
1323                            print >>cc, "ControlGateway: %s" % \
1324                                    self.control_gateway
1325                            if tbparams[self.master].has_key('smbshare'):
1326                                print >>cc, "SMBSHare: %s" % \
1327                                        tbparams[self.master]['smbshare']
1328                            print >>cc, "ProjectUser: %s" % \
1329                                    tbparams[self.master]['user']
1330                            print >>cc, "ProjectName: %s" % \
1331                                    tbparams[self.master]['project']
1332                            cc.close()
1333                        except IOError:
1334                            raise service_error(service_error.internal,
1335                                    "Error creating client config")
1336                        try:
1337                            cc = open("%s/%s/seer.conf" %
1338                                    (self.tmpdir, self.current_gateways),
1339                                    'w')
1340                            if self.current_gateways != self.master:
1341                                print >>cc, "ControlNode: %s" % \
1342                                        self.control_gateway
1343                            print >>cc, "ExperimentID: %s/%s" % \
1344                                    ( tbparams[self.master]['project'], \
1345                                    self.eid )
1346                            cc.close()
1347                        except IOError:
1348                            raise service_error(service_error.internal,
1349                                    "Error creating seer config")
1350                    else:
1351                        debug.error("[gateways]: No control gateway for %s" %\
1352                                    self.current_gateways)
1353                    self.current_gateways = None
1354                else:
1355                    dtb, myname, desthost, type = line.split(" ")
1356
1357                    if type == "control" or type == "both":
1358                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1359                                self.eid, 
1360                                tbparams[self.current_gateways]['project'],
1361                                tbparams[self.current_gateways]['domain'])
1362                    try:
1363                        active = self.gateway_conf_file(self.current_gateways,
1364                                self.master, self.eid, self.gw_pubkey_base,
1365                                self.gw_secretkey_base,
1366                                self.active_end, tbparams, dtb, myname,
1367                                desthost, type)
1368                    except IOError, e:
1369                        raise service_error(service_error.internal,
1370                                "Failed to write config file for %s" % \
1371                                        self.current_gateway)
1372           
1373                    gw_pubkey = "%s/keys/%s" % \
1374                            (self.tmpdir, self.gw_pubkey_base)
1375                    gw_secretkey = "%s/keys/%s" % \
1376                            (self.tmpdir, self.gw_secretkey_base)
1377
1378                    pkfile = "%s/%s/%s" % \
1379                            ( self.tmpdir, self.current_gateways, 
1380                                    self.gw_pubkey_base)
1381                    skfile = "%s/%s/%s" % \
1382                            ( self.tmpdir, self.current_gateways, 
1383                                    self.gw_secretkey_base)
1384
1385                    if not os.path.exists(pkfile):
1386                        try:
1387                            self.copy_file(gw_pubkey, pkfile)
1388                        except IOError:
1389                            service_error(service_error.internal,
1390                                    "Failed to copy pubkey file")
1391
1392                    if active and not os.path.exists(skfile):
1393                        try:
1394                            self.copy_file(gw_secretkey, skfile)
1395                        except IOError:
1396                            service_error(service_error.internal,
1397                                    "Failed to copy secretkey file")
1398                return True
1399
1400    class shunt_to_file:
1401        """
1402        Simple class to write data between two regexps to a file.
1403        """
1404        def __init__(self, begin, end, filename):
1405            """
1406            Begin shunting on a match of begin, stop on end, send data to
1407            filename.
1408            """
1409            self.begin = re.compile(begin)
1410            self.end = re.compile(end)
1411            self.in_shunt = False
1412            self.file = None
1413            self.filename = filename
1414
1415        def __call__(self, line):
1416            """
1417            Call this on each line in the input that may be shunted.
1418            """
1419            if not self.in_shunt:
1420                if self.begin.match(line):
1421                    self.in_shunt = True
1422                    try:
1423                        self.file = open(self.filename, "w")
1424                    except:
1425                        self.file = None
1426                        raise
1427                    return True
1428                else:
1429                    return False
1430            else:
1431                if self.end.match(line):
1432                    if self.file: 
1433                        self.file.close()
1434                        self.file = None
1435                    self.in_shunt = False
1436                else:
1437                    if self.file:
1438                        print >>self.file, line
1439                return True
1440
1441    class shunt_to_list:
1442        """
1443        Same interface as shunt_to_file.  Data collected in self.list, one list
1444        element per line.
1445        """
1446        def __init__(self, begin, end):
1447            self.begin = re.compile(begin)
1448            self.end = re.compile(end)
1449            self.in_shunt = False
1450            self.list = [ ]
1451       
1452        def __call__(self, line):
1453            if not self.in_shunt:
1454                if self.begin.match(line):
1455                    self.in_shunt = True
1456                    return True
1457                else:
1458                    return False
1459            else:
1460                if self.end.match(line):
1461                    self.in_shunt = False
1462                else:
1463                    self.list.append(line)
1464                return True
1465
1466    class shunt_to_string:
1467        """
1468        Same interface as shunt_to_file.  Data collected in self.str, all in
1469        one string.
1470        """
1471        def __init__(self, begin, end):
1472            self.begin = re.compile(begin)
1473            self.end = re.compile(end)
1474            self.in_shunt = False
1475            self.str = ""
1476       
1477        def __call__(self, line):
1478            if not self.in_shunt:
1479                if self.begin.match(line):
1480                    self.in_shunt = True
1481                    return True
1482                else:
1483                    return False
1484            else:
1485                if self.end.match(line):
1486                    self.in_shunt = False
1487                else:
1488                    self.str += line
1489                return True
1490
1491    def create_experiment(self, req, fid):
1492        """
1493        The external interface to experiment creation called from the
1494        dispatcher.
1495
1496        Creates a working directory, splits the incoming description using the
1497        splitter script and parses out the avrious subsections using the
1498        lcasses above.  Once each sub-experiment is created, use pooled threads
1499        to instantiate them and start it all up.
1500        """
1501
1502        if not self.auth.check_attribute(fid, 'create'):
1503            raise service_error(service_error.access, "Create access denied")
1504
1505        try:
1506            tmpdir = tempfile.mkdtemp(prefix="split-")
1507        except IOError:
1508            raise service_error(service_error.internal, "Cannot create tmp dir")
1509
1510        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1511        gw_secretkey_base = "fed.%s" % self.ssh_type
1512        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1513        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1514        tclfile = tmpdir + "/experiment.tcl"
1515        tbparams = { }
1516        try:
1517            access_user = self.accessdb[fid]
1518        except KeyError:
1519            raise service_error(service_error.internal,
1520                    "Access map and authorizer out of sync in " + \
1521                            "create_experiment for fedid %s"  % fid)
1522
1523        pid = "dummy"
1524        gid = "dummy"
1525        # XXX
1526        fail_soft = False
1527
1528        try:
1529            os.mkdir(tmpdir+"/keys")
1530        except OSError:
1531            raise service_error(service_error.internal,
1532                    "Can't make temporary dir")
1533
1534        req = req.get('CreateRequestBody', None)
1535        if not req:
1536            raise service_error(service_error.req,
1537                    "Bad request format (no CreateRequestBody)")
1538        # The tcl parser needs to read a file so put the content into that file
1539        descr=req.get('experimentdescription', None)
1540        if descr:
1541            file_content=descr.get('ns2description', None)
1542            if file_content:
1543                try:
1544                    f = open(tclfile, 'w')
1545                    f.write(file_content)
1546                    f.close()
1547                except IOError:
1548                    raise service_error(service_error.internal,
1549                            "Cannot write temp experiment description")
1550            else:
1551                raise service_error(service_error.req, 
1552                        "Only ns2descriptions supported")
1553        else:
1554            raise service_error(service_error.req, "No experiment description")
1555
1556        if req.has_key('experimentID') and \
1557                req['experimentID'].has_key('localname'):
1558            eid = req['experimentID']['localname']
1559            self.state_lock.acquire()
1560            while (self.state.has_key(eid)):
1561                eid += random.choice(string.ascii_letters)
1562            # To avoid another thread picking this localname
1563            self.state[eid] = "placeholder"
1564            self.state_lock.release()
1565        else:
1566            eid = self.exp_stem
1567            for i in range(0,5):
1568                eid += random.choice(string.ascii_letters)
1569            self.state_lock.acquire()
1570            while (self.state.has_key(eid)):
1571                eid = self.exp_stem
1572                for i in range(0,5):
1573                    eid += random.choice(string.ascii_letters)
1574            # To avoid another thread picking this localname
1575            self.state[eid] = "placeholder"
1576            self.state_lock.release()
1577
1578        try:
1579            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1580        except ValueError:
1581            raise service_error(service_error.server_config, 
1582                    "Bad key type (%s)" % self.ssh_type)
1583
1584        user = req.get('user', None)
1585        if user == None:
1586            raise service_error(service_error.req, "No user")
1587
1588        master = req.get('master', None)
1589        if not master:
1590            raise service_error(service_error.req, "No master testbed label")
1591        export_project = req.get('exportProject', None)
1592        if not export_project:
1593            raise service_error(service_error.req, "No export project")
1594       
1595        if self.splitter_url:
1596            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1597            split_data = self.remote_splitter(self.splitter_url, file_content,
1598                    master)
1599        else:
1600            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1601                str(self.muxmax), '-m', master]
1602
1603            if self.fedkit:
1604                tclcmd.append('-k')
1605
1606            tclcmd.extend([pid, gid, eid, tclfile])
1607
1608            self.log.debug("running local splitter %s", " ".join(tclcmd))
1609            tclparser = Popen(tclcmd, stdout=PIPE)
1610            split_data = tclparser.stdout
1611
1612        allocated = { }     # Testbeds we can access
1613        started = { }       # Testbeds where a sub-experiment started
1614                            # successfully
1615
1616        # Objects to parse the splitter output (defined above)
1617        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1618        parse_allbeds = self.allbeds(self.get_access)
1619        parse_gateways = self.gateways(eid, master, tmpdir,
1620                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1621        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1622                    "^#\s+End\s+Vtopo")
1623        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1624                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1625        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1626                "^#\s+End\s+tarfiles")
1627        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1628                "^#\s+End\s+rpms")
1629
1630        try: 
1631            # Working on the split data
1632            for line in split_data:
1633                line = line.rstrip()
1634                if parse_current_testbed(line, master, allocated, tbparams):
1635                    continue
1636                elif parse_allbeds(line, user, tbparams, master, export_project,
1637                        access_user):
1638                    continue
1639                elif parse_gateways(line, allocated, tbparams):
1640                    continue
1641                elif parse_vtopo(line):
1642                    continue
1643                elif parse_hostnames(line):
1644                    continue
1645                elif parse_tarfiles(line):
1646                    continue
1647                elif parse_rpms(line):
1648                    continue
1649                else:
1650                    raise service_error(service_error.internal, 
1651                            "Bad tcl parse? %s" % line)
1652        except service_error, e:
1653            # If something goes wrong in the parse (usually an access error)
1654            # clear the placeholder state
1655            self.state_lock.acquire()
1656            del self.state[eid]
1657            self.state_lock.release()
1658            raise e
1659       
1660        # Virtual topology and visualization
1661        vtopo = self.gentopo(parse_vtopo.str)
1662        if not vtopo:
1663            raise service_error(service_error.internal, 
1664                    "Failed to generate virtual topology")
1665
1666        vis = self.genviz(vtopo)
1667        if not vis:
1668            raise service_error(service_error.internal, 
1669                    "Failed to generate visualization")
1670
1671        # save federant information
1672        for k in allocated.keys():
1673            tbparams[k]['federant'] = {\
1674                    'name': [ { 'localname' : eid} ],\
1675                    'emulab': tbparams[k]['emulab'],\
1676                    'allocID' : tbparams[k]['allocID'],\
1677                    'master' : k == master,\
1678                }
1679
1680
1681        # Copy tarfiles and rpms needed at remote sites into a staging area
1682        try:
1683            if self.fedkit:
1684                parse_tarfiles.list.append(self.fedkit)
1685            for t in parse_tarfiles.list:
1686                if not os.path.exists("%s/tarfiles" % tmpdir):
1687                    os.mkdir("%s/tarfiles" % tmpdir)
1688                self.copy_file(t, "%s/tarfiles/%s" % \
1689                        (tmpdir, os.path.basename(t)))
1690            for r in parse_rpms.list:
1691                if not os.path.exists("%s/rpms" % tmpdir):
1692                    os.mkdir("%s/rpms" % tmpdir)
1693                self.copy_file(r, "%s/rpms/%s" % \
1694                        (tmpdir, os.path.basename(r)))
1695        except IOError, e:
1696            raise service_error(service_error.internal, 
1697                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1698
1699        thread_pool_info = self.thread_pool()
1700        threads = [ ]
1701
1702        for tb in [ k for k in allocated.keys() if k != master]:
1703            # Wait until we have a free slot to start the next testbed load
1704            thread_pool_info.acquire()
1705            while thread_pool_info.started - \
1706                    thread_pool_info.terminated >= self.nthreads:
1707                thread_pool_info.wait()
1708            thread_pool_info.release()
1709
1710            # Create and start a thread to start the segment, and save it to
1711            # get the return value later
1712            t  = self.pooled_thread(target=self.start_segment, 
1713                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1714                    pdata=thread_pool_info, trace_file=self.trace_file)
1715            threads.append(t)
1716            t.start()
1717
1718        # Wait until all finish (the first clause of the while is to make sure
1719        # one starts)
1720        thread_pool_info.acquire()
1721        while thread_pool_info.started == 0 or \
1722                thread_pool_info.started > thread_pool_info.terminated:
1723            thread_pool_info.wait()
1724        thread_pool_info.release()
1725
1726        # If none failed, start the master
1727        failed = [ t.getName() for t in threads if not t.rv ]
1728
1729        if len(failed) == 0:
1730            if not self.start_segment(master, eid, tbparams, tmpdir):
1731                failed.append(master)
1732
1733        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1734        # If one failed clean up, unless fail_soft is set
1735        if failed:
1736            if not fail_soft:
1737                for tb in succeeded:
1738                    self.stop_segment(tb, eid, tbparams)
1739                # Remove the placeholder
1740                self.state_lock.acquire()
1741                del self.state[eid]
1742                self.state_lock.release()
1743
1744                raise service_error(service_error.federant,
1745                    "Swap in failed on %s" % ",".join(failed))
1746        else:
1747            self.log.info("[start_segment]: Experiment %s started" % eid)
1748
1749        # Generate an ID for the experiment (slice) and a certificate that the
1750        # allocator can use to prove they own it.  We'll ship it back through
1751        # the encrypted connection.
1752        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1753
1754        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1755
1756        # Walk up tmpdir, deleting as we go
1757        for path, dirs, files in os.walk(tmpdir, topdown=False):
1758            for f in files:
1759                os.remove(os.path.join(path, f))
1760            for d in dirs:
1761                os.rmdir(os.path.join(path, d))
1762        os.rmdir(tmpdir)
1763
1764        # The deepcopy prevents the allocation ID and other binaries from being
1765        # translated into other formats
1766        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1767                for tb in tbparams.keys() \
1768                    if tbparams[tb].has_key('federant') ],\
1769                    'vtopo': vtopo,\
1770                    'vis' : vis,
1771                    'experimentID' : [\
1772                            { 'fedid': copy.copy(expid) }, \
1773                            { 'localname': eid },\
1774                        ],\
1775                    'experimentAccess': { 'X509' : expcert },\
1776                }
1777
1778        # Insert the experiment into our state and update the disk copy
1779        self.state_lock.acquire()
1780        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1781                for tb in tbparams.keys() \
1782                    if tbparams[tb].has_key('federant') ],\
1783                    'vtopo': vtopo,\
1784                    'vis' : vis,
1785                    'owner': fid,
1786                    'experimentID' : [\
1787                            { 'fedid': expid }, { 'localname': eid },\
1788                        ],\
1789                }
1790        self.state[eid] = self.state[expid]
1791        if self.state_filename: self.write_state()
1792        self.state_lock.release()
1793
1794        self.auth.set_attribute(fid, expid)
1795        self.auth.set_attribute(expid, expid)
1796
1797        if not failed:
1798            return resp
1799        else:
1800            raise service_error(service_error.partial, \
1801                    "Partial swap in on %s" % ",".join(succeeded))
1802
1803    def check_experiment_access(self, fid, key):
1804        """
1805        Confirm that the fid has access to the experiment.  Though a request
1806        may be made in terms of a local name, the access attribute is always
1807        the experiment's fedid.
1808        """
1809        if not isinstance(key, fedid):
1810            self.state_lock.acquire()
1811            if self.state.has_key(key):
1812                try:
1813                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1814                            if f.has_key('fedid') ]
1815                except KeyError:
1816                    self.state_lock.release()
1817                    raise service_error(service_error.internal, 
1818                            "No fedid for experiment %s when checking " +\
1819                                    "access(!?)" % key)
1820                if len(kl) == 1:
1821                    key = kl[0]
1822                else:
1823                    self.state_lock.release()
1824                    raise service_error(service_error.internal, 
1825                            "multiple fedids for experiment %s when " +\
1826                                    "checking access(!?)" % key)
1827            else:
1828                self.state_lock.release()
1829                raise service_error(service_error.access, "Access Denied")
1830            self.state_lock.release()
1831
1832        if self.auth.check_attribute(fid, key):
1833            return True
1834        else:
1835            raise service_error(service_error.access, "Access Denied")
1836
1837
1838
1839    def get_vtopo(self, req, fid):
1840        """
1841        Return the stored virtual topology for this experiment
1842        """
1843        rv = None
1844
1845        req = req.get('VtopoRequestBody', None)
1846        if not req:
1847            raise service_error(service_error.req,
1848                    "Bad request format (no VtopoRequestBody)")
1849        exp = req.get('experiment', None)
1850        if exp:
1851            if exp.has_key('fedid'):
1852                key = exp['fedid']
1853                keytype = "fedid"
1854            elif exp.has_key('localname'):
1855                key = exp['localname']
1856                keytype = "localname"
1857            else:
1858                raise service_error(service_error.req, "Unknown lookup type")
1859        else:
1860            raise service_error(service_error.req, "No request?")
1861
1862        self.check_experiment_access(fid, key)
1863
1864        self.state_lock.acquire()
1865        if self.state.has_key(key):
1866            rv = { 'experiment' : {keytype: key },\
1867                    'vtopo': self.state[key]['vtopo'],\
1868                }
1869        self.state_lock.release()
1870
1871        if rv: return rv
1872        else: raise service_error(service_error.req, "No such experiment")
1873
1874    def get_vis(self, req, fid):
1875        """
1876        Return the stored visualization for this experiment
1877        """
1878        rv = None
1879
1880        req = req.get('VisRequestBody', None)
1881        if not req:
1882            raise service_error(service_error.req,
1883                    "Bad request format (no VisRequestBody)")
1884        exp = req.get('experiment', None)
1885        if exp:
1886            if exp.has_key('fedid'):
1887                key = exp['fedid']
1888                keytype = "fedid"
1889            elif exp.has_key('localname'):
1890                key = exp['localname']
1891                keytype = "localname"
1892            else:
1893                raise service_error(service_error.req, "Unknown lookup type")
1894        else:
1895            raise service_error(service_error.req, "No request?")
1896
1897        self.check_experiment_access(fid, key)
1898
1899        self.state_lock.acquire()
1900        if self.state.has_key(key):
1901            rv =  { 'experiment' : {keytype: key },\
1902                    'vis': self.state[key]['vis'],\
1903                    }
1904        self.state_lock.release()
1905
1906        if rv: return rv
1907        else: raise service_error(service_error.req, "No such experiment")
1908
1909    def get_info(self, req, fid):
1910        """
1911        Return all the stored info about this experiment
1912        """
1913        rv = None
1914
1915        req = req.get('InfoRequestBody', None)
1916        if not req:
1917            raise service_error(service_error.req,
1918                    "Bad request format (no VisRequestBody)")
1919        exp = req.get('experiment', None)
1920        if exp:
1921            if exp.has_key('fedid'):
1922                key = exp['fedid']
1923                keytype = "fedid"
1924            elif exp.has_key('localname'):
1925                key = exp['localname']
1926                keytype = "localname"
1927            else:
1928                raise service_error(service_error.req, "Unknown lookup type")
1929        else:
1930            raise service_error(service_error.req, "No request?")
1931
1932        self.check_experiment_access(fid, key)
1933
1934        # The state may be massaged by the service function that called
1935        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1936        # state.
1937        self.state_lock.acquire()
1938        if self.state.has_key(key):
1939            rv = copy.deepcopy(self.state[key])
1940        self.state_lock.release()
1941
1942        if rv: return rv
1943        else: raise service_error(service_error.req, "No such experiment")
1944
1945
1946    def terminate_experiment(self, req, fid):
1947        """
1948        Swap this experiment out on the federants and delete the shared
1949        information
1950        """
1951        tbparams = { }
1952        req = req.get('TerminateRequestBody', None)
1953        if not req:
1954            raise service_error(service_error.req,
1955                    "Bad request format (no TerminateRequestBody)")
1956        exp = req.get('experiment', None)
1957        if exp:
1958            if exp.has_key('fedid'):
1959                key = exp['fedid']
1960                keytype = "fedid"
1961            elif exp.has_key('localname'):
1962                key = exp['localname']
1963                keytype = "localname"
1964            else:
1965                raise service_error(service_error.req, "Unknown lookup type")
1966        else:
1967            raise service_error(service_error.req, "No request?")
1968
1969        self.check_experiment_access(fid, key)
1970
1971        self.state_lock.acquire()
1972        fed_exp = self.state.get(key, None)
1973
1974        if fed_exp:
1975            # This branch of the conditional holds the lock to generate a
1976            # consistent temporary tbparams variable to deallocate experiments.
1977            # It releases the lock to do the deallocations and reacquires it to
1978            # remove the experiment state when the termination is complete.
1979            ids = []
1980            #  experimentID is a list of dicts that are self-describing
1981            #  identifiers.  This finds all the fedids and localnames - the
1982            #  keys of self.state - and puts them into ids.
1983            for id in fed_exp.get('experimentID', []):
1984                if id.has_key('fedid'): ids.append(id['fedid'])
1985                if id.has_key('localname'): ids.append(id['localname'])
1986
1987            # Construct enough of the tbparams to make the stop_segment calls
1988            # work
1989            for fed in fed_exp['federant']:
1990                try:
1991                    for e in fed['name']:
1992                        eid = e.get('localname', None)
1993                        if eid: break
1994                    else:
1995                        continue
1996
1997                    p = fed['emulab']['project']
1998
1999                    project = p['name']['localname']
2000                    tb = p['testbed']['localname']
2001                    user = p['user'][0]['userID']['localname']
2002
2003                    domain = fed['emulab']['domain']
2004                    host  = "%s%s" % (fed['emulab']['ops'], domain)
2005                    aid = fed['allocID']
2006                except KeyError, e:
2007                    continue
2008                tbparams[tb] = {\
2009                        'user': user,\
2010                        'domain': domain,\
2011                        'project': project,\
2012                        'host': host,\
2013                        'eid': eid,\
2014                        'aid': aid,\
2015                    }
2016            self.state_lock.release()
2017
2018            # Stop everyone.
2019            for tb in tbparams.keys():
2020                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
2021
2022            # release the allocations
2023            for tb in tbparams.keys():
2024                self.release_access(tb, tbparams[tb]['aid'])
2025
2026            # Remove the terminated experiment
2027            self.state_lock.acquire()
2028            for id in ids:
2029                if self.state.has_key(id): del self.state[id]
2030
2031            if self.state_filename: self.write_state()
2032            self.state_lock.release()
2033
2034            return { 'experiment': exp }
2035        else:
2036            # Don't forget to release the lock
2037            self.state_lock.release()
2038            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.