source: fedd/federation/experiment_control.py @ 416292f

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 416292f was c8853fd, checked in by Ted Faber <faber@…>, 16 years ago

Bug with experiment creation

  • Property mode set to 100644
File size: 60.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53
54        def acquire(self):
55            """
56            Get the pool's lock.
57            """
58            self.changed.acquire()
59
60        def release(self):
61            """
62            Release the pool's lock.
63            """
64            self.changed.release()
65
66        def wait(self, timeout = None):
67            """
68            Wait for a pool thread to start or stop.
69            """
70            self.changed.wait(timeout)
71
72        def start(self):
73            """
74            Called by a pool thread to report starting.
75            """
76            self.changed.acquire()
77            self.started += 1
78            self.changed.notifyAll()
79            self.changed.release()
80
81        def terminate(self):
82            """
83            Called by a pool thread to report finishing.
84            """
85            self.changed.acquire()
86            self.terminated += 1
87            self.changed.notifyAll()
88            self.changed.release()
89
90        def clear(self):
91            """
92            Clear all pool data.
93            """
94            self.changed.acquire()
95            self.started = 0
96            self.terminated =0
97            self.changed.notifyAll()
98            self.changed.release()
99
100    class pooled_thread(Thread):
101        """
102        One of a set of threads dedicated to a specific task.  Uses the
103        thread_pool class above for coordination.
104        """
105        def __init__(self, group=None, target=None, name=None, args=(), 
106                kwargs={}, pdata=None, trace_file=None):
107            Thread.__init__(self, group, target, name, args, kwargs)
108            self.rv = None          # Return value of the ops in this thread
109            self.exception = None   # Exception that terminated this thread
110            self.target=target      # Target function to run on start()
111            self.args = args        # Args to pass to target
112            self.kwargs = kwargs    # Additional kw args
113            self.pdata = pdata      # thread_pool for this class
114            # Logger for this thread
115            self.log = logging.getLogger("fedd.experiment_control")
116       
117        def run(self):
118            """
119            Emulate Thread.run, except add pool data manipulation and error
120            logging.
121            """
122            if self.pdata:
123                self.pdata.start()
124
125            if self.target:
126                try:
127                    self.rv = self.target(*self.args, **self.kwargs)
128                except service_error, s:
129                    self.exception = s
130                    self.log.error("Thread exception: %s %s" % \
131                            (s.code_string(), s.desc))
132                except:
133                    self.exception = sys.exc_info()[1]
134                    self.log.error(("Unexpected thread exception: %s" +\
135                            "Trace %s") % (self.exception,\
136                                traceback.format_exc()))
137            if self.pdata:
138                self.pdata.terminate()
139
140    call_RequestAccess = service_caller('RequestAccess')
141    call_ReleaseAccess = service_caller('ReleaseAccess')
142    call_Ns2Split = service_caller('Ns2Split')
143
144    def __init__(self, config=None, auth=None):
145        """
146        Intialize the various attributes, most from the config object
147        """
148        self.thread_with_rv = experiment_control_local.pooled_thread
149        self.thread_pool = experiment_control_local.thread_pool
150
151        self.cert_file = None
152        self.cert_pwd = None
153        self.trusted_certs = None
154
155        # Walk through the various relevant certificat specifying config
156        # attributes until the local certificate attributes can be resolved.
157        # The walk is from most specific to most general specification.
158        for s in ("experiment_control", "globals"):
159            if config.has_section(s): 
160                if config.has_option(s, "cert_file"):
161                    if not self.cert_file:
162                        self.cert_file = config.get(s, "cert_file")
163                        self.cert_pwd = config.get(s, "cert_pwd")
164
165                if config.has_option(s, "trusted_certs"):
166                    if not self.trusted_certs:
167                        self.trusted_certs = config.get(s, "trusted_certs")
168
169
170        self.exp_stem = "fed-stem"
171        self.log = logging.getLogger("fedd.experiment_control")
172        set_log_level(config, "experiment_control", self.log)
173        self.muxmax = 2
174        self.nthreads = 2
175        self.randomize_experiments = False
176
177        self.scp_exec = "/usr/bin/scp"
178        self.splitter = None
179        self.ssh_exec="/usr/bin/ssh"
180        self.ssh_keygen = "/usr/bin/ssh-keygen"
181        self.ssh_identity_file = None
182
183
184        self.debug = config.getboolean("experiment_control", "create_debug")
185        self.state_filename = config.get("experiment_control", 
186                "experiment_state_file")
187        self.splitter_url = config.get("experiment_control", "splitter_url")
188        self.fedkit = config.get("experiment_control", "fedkit")
189        accessdb_file = config.get("experiment_control", "accessdb")
190
191        self.ssh_pubkey_file = config.get("experiment_control", 
192                "ssh_pubkey_file")
193        self.ssh_privkey_file = config.get("experiment_control",
194                "ssh_privkey_file")
195        # NB for internal master/slave ops, not experiment setup
196        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
197        self.state = { }
198        self.state_lock = Lock()
199        self.tclsh = "/usr/local/bin/otclsh"
200        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
201                config.get("experiment_control", "tcl_splitter",
202                        "/usr/testbed/lib/ns2ir/parse.tcl")
203        mapdb_file = config.get("experiment_control", "mapdb")
204        self.trace_file = sys.stderr
205
206        self.def_expstart = \
207                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
208                "/tmp/federate";
209        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
210                "FEDDIR/hosts";
211        self.def_gwstart = \
212                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
213                "/tmp/bridge.log";
214        self.def_mgwstart = \
215                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
216                "/tmp/bridge.log";
217        self.def_gwimage = "FBSD61-TUNNEL2";
218        self.def_gwtype = "pc";
219        self.local_access = { }
220
221        if auth:
222            self.auth = auth
223        else:
224            self.log.error(\
225                    "[access]: No authorizer initialized, creating local one.")
226            auth = authorizer()
227
228
229        if self.ssh_pubkey_file:
230            try:
231                f = open(self.ssh_pubkey_file, 'r')
232                self.ssh_pubkey = f.read()
233                f.close()
234            except IOError:
235                raise service_error(service_error.internal,
236                        "Cannot read sshpubkey")
237        else:
238            raise service_error(service_error.internal, 
239                    "No SSH public key file?")
240
241        if not self.ssh_privkey_file:
242            raise service_error(service_error.internal, 
243                    "No SSH public key file?")
244
245
246        if mapdb_file:
247            self.read_mapdb(mapdb_file)
248        else:
249            self.log.warn("[experiment_control] No testbed map, using defaults")
250            self.tbmap = { 
251                    'deter':'https://users.isi.deterlab.net:23235',
252                    'emulab':'https://users.isi.deterlab.net:23236',
253                    'ucb':'https://users.isi.deterlab.net:23237',
254                    }
255
256        if accessdb_file:
257                self.read_accessdb(accessdb_file)
258        else:
259            raise service_error(service_error.internal,
260                    "No accessdb specified in config")
261
262        # Grab saved state.  OK to do this w/o locking because it's read only
263        # and only one thread should be in existence that can see self.state at
264        # this point.
265        if self.state_filename:
266            self.read_state()
267
268        # Dispatch tables
269        self.soap_services = {\
270                'Create': soap_handler('Create', self.create_experiment),
271                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
272                'Vis': soap_handler('Vis', self.get_vis),
273                'Info': soap_handler('Info', self.get_info),
274                'Terminate': soap_handler('Terminate', 
275                    self.terminate_experiment),
276        }
277
278        self.xmlrpc_services = {\
279                'Create': xmlrpc_handler('Create', self.create_experiment),
280                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
281                'Vis': xmlrpc_handler('Vis', self.get_vis),
282                'Info': xmlrpc_handler('Info', self.get_info),
283                'Terminate': xmlrpc_handler('Terminate',
284                    self.terminate_experiment),
285        }
286
287    def copy_file(self, src, dest, size=1024):
288        """
289        Exceedingly simple file copy.
290        """
291        s = open(src,'r')
292        d = open(dest, 'w')
293
294        buf = "x"
295        while buf != "":
296            buf = s.read(size)
297            d.write(buf)
298        s.close()
299        d.close()
300
301    # Call while holding self.state_lock
302    def write_state(self):
303        """
304        Write a new copy of experiment state after copying the existing state
305        to a backup.
306
307        State format is a simple pickling of the state dictionary.
308        """
309        if os.access(self.state_filename, os.W_OK):
310            self.copy_file(self.state_filename, \
311                    "%s.bak" % self.state_filename)
312        try:
313            f = open(self.state_filename, 'w')
314            pickle.dump(self.state, f)
315        except IOError, e:
316            self.log.error("Can't write file %s: %s" % \
317                    (self.state_filename, e))
318        except pickle.PicklingError, e:
319            self.log.error("Pickling problem: %s" % e)
320        except TypeError, e:
321            self.log.error("Pickling problem (TypeError): %s" % e)
322
323    # Call while holding self.state_lock
324    def read_state(self):
325        """
326        Read a new copy of experiment state.  Old state is overwritten.
327
328        State format is a simple pickling of the state dictionary.
329        """
330        try:
331            f = open(self.state_filename, "r")
332            self.state = pickle.load(f)
333            self.log.debug("[read_state]: Read state from %s" % \
334                    self.state_filename)
335        except IOError, e:
336            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
337                    % (self.state_filename, e))
338        except pickle.UnpicklingError, e:
339            self.log.warning(("[read_state]: No saved state: " + \
340                    "Unpickling failed: %s") % e)
341       
342        for k in self.state.keys():
343            try:
344                # This list should only have one element in it, but phrasing it
345                # as a for loop doesn't cost much, really.  We have to find the
346                # fedid elements anyway.
347                for eid in [ f['fedid'] \
348                        for f in self.state[k]['experimentID']\
349                            if f.has_key('fedid') ]:
350                    self.auth.set_attribute(self.state[k]['owner'], eid)
351            except KeyError, e:
352                self.log.warning("[read_state]: State ownership or identity " +\
353                        "misformatted in %s: %s" % (self.state_filename, e))
354
355
356    def read_accessdb(self, accessdb_file):
357        """
358        Read the mapping from fedids that can create experiments to their name
359        in the 3-level access namespace.  All will be asserted from this
360        testbed and can include the local username and porject that will be
361        asserted on their behalf by this fedd.  Each fedid is also added to the
362        authorization system with the "create" attribute.
363        """
364        self.accessdb = {}
365        # These are the regexps for parsing the db
366        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
367        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
368                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
369        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
370                "\s*->\s*(" + name_expr + ")\s*$")
371        lineno = 0
372
373        # Parse the mappings and store in self.authdb, a dict of
374        # fedid -> (proj, user)
375        try:
376            f = open(accessdb_file, "r")
377            for line in f:
378                lineno += 1
379                line = line.strip()
380                if len(line) == 0 or line.startswith('#'):
381                    continue
382                m = project_line.match(line)
383                if m:
384                    fid = fedid(hexstr=m.group(1))
385                    project, user = m.group(2,3)
386                    if not self.accessdb.has_key(fid):
387                        self.accessdb[fid] = []
388                    self.accessdb[fid].append((project, user))
389                    continue
390
391                m = user_line.match(line)
392                if m:
393                    fid = fedid(hexstr=m.group(1))
394                    project = None
395                    user = m.group(2)
396                    if not self.accessdb.has_key(fid):
397                        self.accessdb[fid] = []
398                    self.accessdb[fid].append((project, user))
399                    continue
400                self.log.warn("[experiment_control] Error parsing access " +\
401                        "db %s at line %d" %  (accessdb_file, lineno))
402        except IOError:
403            raise service_error(service_error.internal,
404                    "Error opening/reading %s as experiment " +\
405                            "control accessdb" %  accessdb_file)
406        f.close()
407
408        # Initialize the authorization attributes
409        for fid in self.accessdb.keys():
410            self.auth.set_attribute(fid, 'create')
411
412    def read_mapdb(self, file):
413        """
414        Read a simple colon separated list of mappings for the
415        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
416        """
417
418        self.tbmap = { }
419        lineno =0
420        try:
421            f = open(file, "r")
422            for line in f:
423                lineno += 1
424                line = line.strip()
425                if line.startswith('#') or len(line) == 0:
426                    continue
427                try:
428                    label, url = line.split(':', 1)
429                    self.tbmap[label] = url
430                except ValueError, e:
431                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
432                            "map db: %s %s" % (lineno, line, e))
433        except IOError, e:
434            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
435                    "open %s: %s" % (file, e))
436        f.close()
437
438    def scp_file(self, file, user, host, dest=""):
439        """
440        scp a file to the remote host.  If debug is set the action is only
441        logged.
442        """
443
444        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', '-i', 
445                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
446        rv = 0
447
448        try:
449            dnull = open("/dev/null", "r")
450        except IOError:
451            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
452            dnull = Null
453
454        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
455        if not self.debug:
456            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
457            else: rv = call(scp_cmd)
458
459        return rv == 0
460
461    def ssh_cmd(self, user, host, cmd, wname=None):
462        """
463        Run a remote command on host as user.  If debug is set, the action is
464        only logged.
465        """
466        sh_str = "%s -o 'IdentitiesOnly yes' -i %s %s@%s %s" % \
467                (self.ssh_exec, self.ssh_privkey_file, 
468                        user, host, cmd)
469
470        try:
471            dnull = open("/dev/null", "r")
472        except IOError:
473            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
474            dnull = Null
475
476        self.log.debug("[ssh_cmd]: %s" % sh_str)
477        if not self.debug:
478            if dnull:
479                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
480            else:
481                sub = Popen(sh_str, shell=True)
482            return sub.wait() == 0
483        else:
484            return True
485
486    def ship_configs(self, host, user, src_dir, dest_dir):
487        """
488        Copy federant-specific configuration files to the federant.
489        """
490        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
491            return False
492        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
493            return False
494
495        for f in os.listdir(src_dir):
496            if os.path.isdir(f):
497                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
498                        "%s/%s" % (dest_dir, f)):
499                    return False
500            else:
501                if not self.scp_file("%s/%s" % (src_dir, f), 
502                        user, host, dest_dir):
503                    return False
504        return True
505
506    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
507        """
508        Start a sub-experiment on a federant.
509
510        Get the current state, modify or create as appropriate, ship data and
511        configs and start the experiment.  There are small ordering differences
512        based on the initial state of the sub-experiment.
513        """
514        # ops node in the federant
515        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
516        user = tbparams[tb]['user']     # federant user
517        pid = tbparams[tb]['project']   # federant project
518        # XXX
519        base_confs = ( "hosts",)
520        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
521        # command to test experiment state
522        expinfo_exec = "/usr/testbed/bin/expinfo" 
523        # Configuration directories on the remote machine
524        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
525        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
526        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
527        # Regular expressions to parse the expinfo response
528        state_re = re.compile("State:\s+(\w+)")
529        no_exp_re = re.compile("^No\s+such\s+experiment")
530        state = None    # Experiment state parsed from expinfo
531        # The expinfo ssh command.  Note the identity restriction to use only
532        # the identity provided in the pubkey given.
533        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-i', 
534                self.ssh_privkey_file, "%s@%s" % (user, host), 
535                expinfo_exec, pid, eid]
536
537        # Get status
538        self.log.debug("[start_segment]: %s"% " ".join(cmd))
539        dev_null = None
540        try:
541            dev_null = open("/dev/null", "a")
542        except IOError, e:
543            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
544
545        if self.debug:
546            state = 'swapped'
547            rv = 0
548        else:
549            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
550            for line in status.stdout:
551                m = state_re.match(line)
552                if m: state = m.group(1)
553                else:
554                    m = no_exp_re.match(line)
555                    if m: state = "none"
556            rv = status.wait()
557
558        # If the experiment is not present the subcommand returns a non-zero
559        # return value.  If we successfully parsed a "none" outcome, ignore the
560        # return code.
561        if rv != 0 and state != "none":
562            raise service_error(service_error.internal,
563                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
564
565        self.log.debug("[start_segment]: %s: %s" % (tb, state))
566        self.log.info("[start_segment]:transferring experiment to %s" % tb)
567
568        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
569            return False
570        # Clear the federation config dirs
571        if not self.ssh_cmd(user, host, 
572                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
573            return False
574        # Clear and create the tarfiles and rpm directories
575        for d in (tarfiles_dir, rpms_dir):
576            if not self.ssh_cmd(user, host, 
577                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
578                return False
579            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
580                    "create tarfiles"):
581                return False
582       
583        if state == 'active':
584            # Create the federation config dirs (do not move outside the
585            # conditional.  Happens later in new expriment creation)
586            if not self.ssh_cmd(user, host, 
587                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
588                return False
589            # Remote experiment is active.  Modify it.
590            for f in base_confs:
591                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
592                        "%s/%s" % (proj_dir, f)):
593                    return False
594            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
595                    proj_dir):
596                return False
597            if os.path.isdir("%s/tarfiles" % tmpdir):
598                if not self.ship_configs(host, user,
599                        "%s/tarfiles" % tmpdir, tarfiles_dir):
600                    return False
601            if os.path.isdir("%s/rpms" % tmpdir):
602                if not self.ship_configs(host, user,
603                        "%s/rpms" % tmpdir, tarfiles_dir):
604                    return False
605            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
606            if not self.ssh_cmd(user, host,
607                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
608                            (pid, eid, tclfile), "modexp"):
609                return False
610            return True
611        elif state == "swapped":
612            # Create the federation config dirs (do not move outside the
613            # conditional.  Happens later in new expriment creation)
614            if not self.ssh_cmd(user, host, 
615                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
616                return False
617            # Remote experiment swapped out.  Modify it and swap it in.
618            for f in base_confs:
619                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
620                        "%s/%s" % (proj_dir, f)):
621                    return False
622            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
623                    proj_dir):
624                return False
625            if os.path.isdir("%s/tarfiles" % tmpdir):
626                if not self.ship_configs(host, user,
627                        "%s/tarfiles" % tmpdir, tarfiles_dir):
628                    return False
629            if os.path.isdir("%s/rpms" % tmpdir):
630                if not self.ship_configs(host, user,
631                        "%s/rpms" % tmpdir, tarfiles_dir):
632                    return False
633            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
634            if not self.ssh_cmd(user, host,
635                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
636                    "modexp"):
637                return False
638            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
639            if not self.ssh_cmd(user, host,
640                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
641                    "swapexp"):
642                return False
643            return True
644        elif state == "none":
645            # No remote experiment.  Create one.  We do this in 2 steps so we
646            # can put the configuration files and scripts into the new
647            # experiment directories.
648
649            # Tarfiles must be present for creation to work
650            if os.path.isdir("%s/tarfiles" % tmpdir):
651                if not self.ship_configs(host, user,
652                        "%s/tarfiles" % tmpdir, tarfiles_dir):
653                    return False
654            if os.path.isdir("%s/rpms" % tmpdir):
655                if not self.ship_configs(host, user,
656                        "%s/rpms" % tmpdir, tarfiles_dir):
657                    return False
658            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
659            if not self.ssh_cmd(user, host,
660                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
661                            (pid, eid, tclfile), "startexp"):
662                return False
663            # Create the federation config dirs (do not move outside the
664            # conditional.)
665            if not self.ssh_cmd(user, host, 
666                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
667                return False
668            # After startexp the per-experiment directories exist
669            for f in base_confs:
670                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
671                        "%s/%s" % (proj_dir, f)):
672                    return False
673            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
674                    proj_dir):
675                return False
676            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
677            if not self.ssh_cmd(user, host,
678                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
679                    "swapexp"):
680                return False
681            return True
682        else:
683            self.log.debug("[start_segment]:unknown state %s" % state)
684            return False
685
686    def stop_segment(self, tb, eid, tbparams):
687        """
688        Stop a sub experiment by calling swapexp on the federant
689        """
690        user = tbparams[tb]['user']
691        host = tbparams[tb]['host']
692        pid = tbparams[tb]['project']
693
694        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
695        return self.ssh_cmd(user, host,
696                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
697
698       
699    def generate_ssh_keys(self, dest, type="rsa" ):
700        """
701        Generate a set of keys for the gateways to use to talk.
702
703        Keys are of type type and are stored in the required dest file.
704        """
705        valid_types = ("rsa", "dsa")
706        t = type.lower();
707        if t not in valid_types: raise ValueError
708        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
709
710        try:
711            trace = open("/dev/null", "w")
712        except IOError:
713            raise service_error(service_error.internal,
714                    "Cannot open /dev/null??");
715
716        # May raise CalledProcessError
717        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
718        rv = call(cmd, stdout=trace, stderr=trace)
719        if rv != 0:
720            raise service_error(service_error.internal, 
721                    "Cannot generate nonce ssh keys.  %s return code %d" \
722                            % (self.ssh_keygen, rv))
723
724    def gentopo(self, str):
725        """
726        Generate the topology dtat structure from the splitter's XML
727        representation of it.
728
729        The topology XML looks like:
730            <experiment>
731                <nodes>
732                    <node><vname></vname><ips>ip1:ip2</ips></node>
733                </nodes>
734                <lans>
735                    <lan>
736                        <vname></vname><vnode></vnode><ip></ip>
737                        <bandwidth></bandwidth><member>node:port</member>
738                    </lan>
739                </lans>
740        """
741        class topo_parse:
742            """
743            Parse the topology XML and create the dats structure.
744            """
745            def __init__(self):
746                # Typing of the subelements for data conversion
747                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
748                self.int_subelements = ( 'bandwidth',)
749                self.float_subelements = ( 'delay',)
750                # The final data structure
751                self.nodes = [ ]
752                self.lans =  [ ]
753                self.topo = { \
754                        'node': self.nodes,\
755                        'lan' : self.lans,\
756                    }
757                self.element = { }  # Current element being created
758                self.chars = ""     # Last text seen
759
760            def end_element(self, name):
761                # After each sub element the contents is added to the current
762                # element or to the appropriate list.
763                if name == 'node':
764                    self.nodes.append(self.element)
765                    self.element = { }
766                elif name == 'lan':
767                    self.lans.append(self.element)
768                    self.element = { }
769                elif name in self.str_subelements:
770                    self.element[name] = self.chars
771                    self.chars = ""
772                elif name in self.int_subelements:
773                    self.element[name] = int(self.chars)
774                    self.chars = ""
775                elif name in self.float_subelements:
776                    self.element[name] = float(self.chars)
777                    self.chars = ""
778
779            def found_chars(self, data):
780                self.chars += data.rstrip()
781
782
783        tp = topo_parse();
784        parser = xml.parsers.expat.ParserCreate()
785        parser.EndElementHandler = tp.end_element
786        parser.CharacterDataHandler = tp.found_chars
787
788        parser.Parse(str)
789
790        return tp.topo
791       
792
793    def genviz(self, topo):
794        """
795        Generate the visualization the virtual topology
796        """
797
798        neato = "/usr/local/bin/neato"
799        # These are used to parse neato output and to create the visualization
800        # file.
801        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
802        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
803                "%s</type></node>"
804
805        try:
806            # Node names
807            nodes = [ n['vname'] for n in topo['node'] ]
808            topo_lans = topo['lan']
809        except KeyError:
810            raise service_error(service_error.internal, "Bad topology")
811
812        lans = { }
813        links = { }
814
815        # Walk through the virtual topology, organizing the connections into
816        # 2-node connections (links) and more-than-2-node connections (lans).
817        # When a lan is created, it's added to the list of nodes (there's a
818        # node in the visualization for the lan).
819        for l in topo_lans:
820            if links.has_key(l['vname']):
821                if len(links[l['vname']]) < 2:
822                    links[l['vname']].append(l['vnode'])
823                else:
824                    nodes.append(l['vname'])
825                    lans[l['vname']] = links[l['vname']]
826                    del links[l['vname']]
827                    lans[l['vname']].append(l['vnode'])
828            elif lans.has_key(l['vname']):
829                lans[l['vname']].append(l['vnode'])
830            else:
831                links[l['vname']] = [ l['vnode'] ]
832
833
834        # Open up a temporary file for dot to turn into a visualization
835        try:
836            df, dotname = tempfile.mkstemp()
837            dotfile = os.fdopen(df, 'w')
838        except IOError:
839            raise service_error(service_error.internal,
840                    "Failed to open file in genviz")
841
842        # Generate a dot/neato input file from the links, nodes and lans
843        try:
844            print >>dotfile, "graph G {"
845            for n in nodes:
846                print >>dotfile, '\t"%s"' % n
847            for l in links.keys():
848                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
849            for l in lans.keys():
850                for n in lans[l]:
851                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
852            print >>dotfile, "}"
853            dotfile.close()
854        except TypeError:
855            raise service_error(service_error.internal,
856                    "Single endpoint link in vtopo")
857        except IOError:
858            raise service_error(service_error.internal, "Cannot write dot file")
859
860        # Use dot to create a visualization
861        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
862                '-Gpack=true', dotname], stdout=PIPE)
863
864        # Translate dot to vis format
865        vis_nodes = [ ]
866        vis = { 'node': vis_nodes }
867        for line in dot.stdout:
868            m = vis_re.match(line)
869            if m:
870                vn = m.group(1)
871                vis_node = {'name': vn, \
872                        'x': float(m.group(2)),\
873                        'y' : float(m.group(3)),\
874                    }
875                if vn in links.keys() or vn in lans.keys():
876                    vis_node['type'] = 'lan'
877                else:
878                    vis_node['type'] = 'node'
879                vis_nodes.append(vis_node)
880        rv = dot.wait()
881
882        os.remove(dotname)
883        if rv == 0 : return vis
884        else: return None
885
886    def get_access(self, tb, nodes, user, tbparam, master, export_project,
887            access_user):
888        """
889        Get access to testbed through fedd and set the parameters for that tb
890        """
891
892        translate_attr = {
893            'slavenodestartcmd': 'expstart',
894            'slaveconnectorstartcmd': 'gwstart',
895            'masternodestartcmd': 'mexpstart',
896            'masterconnectorstartcmd': 'mgwstart',
897            'connectorimage': 'gwimage',
898            'connectortype': 'gwtype',
899            'tunnelcfg': 'tun',
900            'smbshare': 'smbshare',
901        }
902
903        uri = self.tbmap.get(tb, None)
904        if not uri:
905            raise service_error(serice_error.server_config, 
906                    "Unknown testbed: %s" % tb)
907
908        # currently this lumps all users into one service access group
909        service_keys = [ a for u in user \
910                for a in u.get('access', []) \
911                    if a.has_key('sshPubkey')]
912
913        if len(service_keys) == 0:
914            raise service_error(service_error.req, 
915                    "Must have at least one SSH pubkey for services")
916
917
918        for p, u in access_user:
919            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
920                    "to %s") %  ((p or "None"), u, uri))
921
922            if p:
923                # Request with user and project specified
924                req = {\
925                        'destinationTestbed' : { 'uri' : uri },
926                        'project': { 
927                            'name': {'localname': p},
928                            'user': [ {'userID': { 'localname': u } } ],
929                            },
930                        'user':  user,
931                        'allocID' : { 'localname': 'test' },
932                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
933                        'serviceAccess' : service_keys
934                    }
935            else:
936                # Request with only user specified
937                req = {\
938                        'destinationTestbed' : { 'uri' : uri },
939                        'user':  [ {'userID': { 'localname': u } } ],
940                        'allocID' : { 'localname': 'test' },
941                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
942                        'serviceAccess' : service_keys
943                    }
944
945            if tb == master:
946                # NB, the export_project parameter is a dict that includes
947                # the type
948                req['exportProject'] = export_project
949
950            # node resources if any
951            if nodes != None and len(nodes) > 0:
952                rnodes = [ ]
953                for n in nodes:
954                    rn = { }
955                    image, hw, count = n.split(":")
956                    if image: rn['image'] = [ image ]
957                    if hw: rn['hardware'] = [ hw ]
958                    if count and int(count) >0 : rn['count'] = int(count)
959                    rnodes.append(rn)
960                req['resources']= { }
961                req['resources']['node'] = rnodes
962
963            try:
964                if self.local_access.has_key(uri):
965                    # Local access call
966                    req = { 'RequestAccessRequestBody' : req }
967                    r = self.local_access[uri].RequestAccess(req, 
968                            fedid(file=self.cert_file))
969                    r = { 'RequestAccessResponseBody' : r }
970                else:
971                    r = self.call_RequestAccess(uri, req, 
972                            self.cert_file, self.cert_pwd, self.trusted_certs)
973            except service_error, e:
974                if e.code == service_error.access:
975                    self.log.debug("[get_access] Access denied")
976                    r = None
977                    continue
978                else:
979                    raise e
980
981            if r.has_key('RequestAccessResponseBody'):
982                # Through to here we have a valid response, not a fault.
983                # Access denied is a fault, so something better or worse than
984                # access denied has happened.
985                r = r['RequestAccessResponseBody']
986                self.log.debug("[get_access] Access granted")
987                break
988            else:
989                raise service_error(service_error.protocol,
990                        "Bad proxy response")
991       
992        if not r:
993            raise service_error(service_error.access, 
994                    "Access denied by %s (%s)" % (tb, uri))
995
996        e = r['emulab']
997        p = e['project']
998        tbparam[tb] = { 
999                "boss": e['boss'],
1000                "host": e['ops'],
1001                "domain": e['domain'],
1002                "fs": e['fileServer'],
1003                "eventserver": e['eventServer'],
1004                "project": unpack_id(p['name']),
1005                "emulab" : e,
1006                "allocID" : r['allocID'],
1007                }
1008        # Make the testbed name be the label the user applied
1009        p['testbed'] = {'localname': tb }
1010
1011        for u in p['user']:
1012            tbparam[tb]['user'] = unpack_id(u['userID'])
1013
1014        for a in e['fedAttr']:
1015            if a['attribute']:
1016                key = translate_attr.get(a['attribute'].lower(), None)
1017                if key:
1018                    tbparam[tb][key]= a['value']
1019       
1020    def release_access(self, tb, aid):
1021        """
1022        Release access to testbed through fedd
1023        """
1024
1025        uri = self.tbmap.get(tb, None)
1026        if not uri:
1027            raise service_error(serice_error.server_config, 
1028                    "Unknown testbed: %s" % tb)
1029
1030        if self.local_access.has_key(uri):
1031            resp = self.local_access[uri].ReleaseAccess(\
1032                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1033                    fedid(file=self.cert_file))
1034            resp = { 'ReleaseAccessResponseBody': resp } 
1035        else:
1036            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1037                    self.cert_file, self.cert_pwd, self.trusted_certs)
1038
1039        # better error coding
1040
1041    def remote_splitter(self, uri, desc, master):
1042
1043        req = {
1044                'description' : { 'ns2description': desc },
1045                'master': master,
1046                'include_fedkit': bool(self.fedkit)
1047            }
1048
1049        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1050                self.trusted_certs)
1051
1052        if r.has_key('Ns2SplitResponseBody'):
1053            r = r['Ns2SplitResponseBody']
1054            if r.has_key('output'):
1055                return r['output'].splitlines()
1056            else:
1057                raise service_error(service_error.protocol, 
1058                        "Bad splitter response (no output)")
1059        else:
1060            raise service_error(service_error.protocol, "Bad splitter response")
1061       
1062    class current_testbed:
1063        """
1064        Object for collecting the current testbed description.  The testbed
1065        description is saved to a file with the local testbed variables
1066        subsittuted line by line.
1067        """
1068        def __init__(self, eid, tmpdir, fedkit):
1069            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1070            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1071            self.current_testbed = None
1072            self.testbed_file = None
1073
1074            self.def_expstart = \
1075                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1076            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1077            self.def_gwstart = \
1078                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1079            self.def_mgwstart = \
1080                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1081            self.def_gwimage = "FBSD61-TUNNEL2";
1082            self.def_gwtype = "pc";
1083
1084            self.eid = eid
1085            self.tmpdir = tmpdir
1086            self.fedkit = fedkit
1087
1088        def __call__(self, line, master, allocated, tbparams):
1089            # Capture testbed topology descriptions
1090            if self.current_testbed == None:
1091                m = self.begin_testbed.match(line)
1092                if m != None:
1093                    self.current_testbed = m.group(1)
1094                    if self.current_testbed == None:
1095                        raise service_error(service_error.req,
1096                                "Bad request format (unnamed testbed)")
1097                    allocated[self.current_testbed] = \
1098                            allocated.get(self.current_testbed,0) + 1
1099                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1100                    if not os.path.exists(tb_dir):
1101                        try:
1102                            os.mkdir(tb_dir)
1103                        except IOError:
1104                            raise service_error(service_error.internal,
1105                                    "Cannot create %s" % tb_dir)
1106                    try:
1107                        self.testbed_file = open("%s/%s.%s.tcl" %
1108                                (tb_dir, self.eid, self.current_testbed), 'w')
1109                    except IOError:
1110                        self.testbed_file = None
1111                    return True
1112                else: return False
1113            else:
1114                m = self.end_testbed.match(line)
1115                if m != None:
1116                    if m.group(1) != self.current_testbed:
1117                        raise service_error(service_error.internal, 
1118                                "Mismatched testbed markers!?")
1119                    if self.testbed_file != None: 
1120                        self.testbed_file.close()
1121                        self.testbed_file = None
1122                    self.current_testbed = None
1123                elif self.testbed_file:
1124                    # Substitute variables and put the line into the local
1125                    # testbed file.
1126                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1127                            self.def_gwtype)
1128                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1129                            self.def_gwimage)
1130                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1131                            self.def_mgwstart)
1132                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1133                            self.def_mexpstart)
1134                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1135                            self.def_gwstart)
1136                    expstart = tbparams[self.current_testbed].get('expstart', 
1137                            self.def_expstart)
1138                    project = tbparams[self.current_testbed].get('project')
1139                    line = re.sub("GWTYPE", gwtype, line)
1140                    line = re.sub("GWIMAGE", gwimage, line)
1141                    if self.current_testbed == master:
1142                        line = re.sub("GWSTART", mgwstart, line)
1143                        line = re.sub("EXPSTART", mexpstart, line)
1144                    else:
1145                        line = re.sub("GWSTART", gwstart, line)
1146                        line = re.sub("EXPSTART", expstart, line)
1147                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1148                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1149                    line = re.sub("EID", self.eid, line)
1150                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1151                            (project, self.eid), line)
1152                    if self.fedkit:
1153                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1154                                line)
1155                    print >>self.testbed_file, line
1156                return True
1157
1158    class allbeds:
1159        """
1160        Process the Allbeds section.  Get access to each federant and save the
1161        parameters in tbparams
1162        """
1163        def __init__(self, get_access):
1164            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1165            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1166            self.in_allbeds = False
1167            self.get_access = get_access
1168
1169        def __call__(self, line, user, tbparams, master, export_project,
1170                access_user):
1171            # Testbed access parameters
1172            if not self.in_allbeds:
1173                if self.begin_allbeds.match(line):
1174                    self.in_allbeds = True
1175                    return True
1176                else:
1177                    return False
1178            else:
1179                if self.end_allbeds.match(line):
1180                    self.in_allbeds = False
1181                else:
1182                    nodes = line.split('|')
1183                    tb = nodes.pop(0)
1184                    self.get_access(tb, nodes, user, tbparams, master,
1185                            export_project, access_user)
1186                return True
1187
1188    class gateways:
1189        def __init__(self, eid, master, tmpdir, gw_pubkey,
1190                gw_secretkey, copy_file, fedkit):
1191            self.begin_gateways = \
1192                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1193            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1194            self.current_gateways = None
1195            self.control_gateway = None
1196            self.active_end = { }
1197
1198            self.eid = eid
1199            self.master = master
1200            self.tmpdir = tmpdir
1201            self.gw_pubkey_base = gw_pubkey
1202            self.gw_secretkey_base = gw_secretkey
1203
1204            self.copy_file = copy_file
1205            self.fedkit = fedkit
1206
1207
1208        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1209                active_end, tbparams, dtb, myname, desthost, type):
1210            """
1211            Produce a gateway configuration file from a gateways line.
1212            """
1213
1214            sproject = tbparams[gw].get('project', 'project')
1215            dproject = tbparams[dtb].get('project', 'project')
1216            sdomain = ".%s.%s%s" % (eid, sproject,
1217                    tbparams[gw].get('domain', ".example.com"))
1218            ddomain = ".%s.%s%s" % (eid, dproject,
1219                    tbparams[dtb].get('domain', ".example.com"))
1220            boss = tbparams[master].get('boss', "boss")
1221            fs = tbparams[master].get('fs', "fs")
1222            event_server = "%s%s" % \
1223                    (tbparams[gw].get('eventserver', "event_server"),
1224                            tbparams[gw].get('domain', "example.com"))
1225            remote_event_server = "%s%s" % \
1226                    (tbparams[dtb].get('eventserver', "event_server"),
1227                            tbparams[dtb].get('domain', "example.com"))
1228            seer_control = "%s%s" % \
1229                    (tbparams[gw].get('control', "control"), sdomain)
1230
1231            if self.fedkit:
1232                remote_script_dir = "/usr/local/federation/bin"
1233                local_script_dir = "/usr/local/federation/bin"
1234            else:
1235                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1236                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1237
1238            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1239            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1240            tunnel_cfg = tbparams[gw].get("tun", "false")
1241
1242            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1243            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1244
1245            # translate to lower case so the `hostname` hack for specifying
1246            # configuration files works.
1247            conf_file = conf_file.lower();
1248            remote_conf_file = remote_conf_file.lower();
1249
1250            if dtb == master:
1251                active = "false"
1252            elif gw == master:
1253                active = "true"
1254            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1255                active = "false"
1256            else:
1257                active_end['%s-%s' % (gw, dtb)] = 1
1258                active = "true"
1259
1260            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1261            print >>gwconfig, "Active: %s" % active
1262            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1263            print >>gwconfig, "BossName: %s" % boss
1264            print >>gwconfig, "FsName: %s" % fs
1265            print >>gwconfig, "EventServerName: %s" % event_server
1266            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1267            print >>gwconfig, "SeerControl: %s" % seer_control
1268            print >>gwconfig, "Type: %s" % type
1269            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1270            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1271                    local_script_dir
1272            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1273            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1274            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1275                    (remote_conf_dir, remote_conf_file)
1276            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1277            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1278            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1279            gwconfig.close()
1280
1281            return active == "true"
1282
1283        def __call__(self, line, allocated, tbparams):
1284            # Process gateways
1285            if not self.current_gateways:
1286                m = self.begin_gateways.match(line)
1287                if m:
1288                    self.current_gateways = m.group(1)
1289                    if allocated.has_key(self.current_gateways):
1290                        # This test should always succeed
1291                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1292                        if not os.path.exists(tb_dir):
1293                            try:
1294                                os.mkdir(tb_dir)
1295                            except IOError:
1296                                raise service_error(service_error.internal,
1297                                        "Cannot create %s" % tb_dir)
1298                    else:
1299                        # XXX
1300                        self.log.error("[gateways]: Ignoring gateways for " + \
1301                                "unknown testbed %s" % self.current_gateways)
1302                        self.current_gateways = None
1303                    return True
1304                else:
1305                    return False
1306            else:
1307                m = self.end_gateways.match(line)
1308                if m :
1309                    if m.group(1) != self.current_gateways:
1310                        raise service_error(service_error.internal,
1311                                "Mismatched gateway markers!?")
1312                    if self.control_gateway:
1313                        try:
1314                            cc = open("%s/%s/client.conf" %
1315                                    (self.tmpdir, self.current_gateways), 'w')
1316                            print >>cc, "ControlGateway: %s" % \
1317                                    self.control_gateway
1318                            if tbparams[self.master].has_key('smbshare'):
1319                                print >>cc, "SMBSHare: %s" % \
1320                                        tbparams[self.master]['smbshare']
1321                            print >>cc, "ProjectUser: %s" % \
1322                                    tbparams[self.master]['user']
1323                            print >>cc, "ProjectName: %s" % \
1324                                    tbparams[self.master]['project']
1325                            cc.close()
1326                        except IOError:
1327                            raise service_error(service_error.internal,
1328                                    "Error creating client config")
1329                        try:
1330                            cc = open("%s/%s/seer.conf" %
1331                                    (self.tmpdir, self.current_gateways),
1332                                    'w')
1333                            if self.current_gateways != self.master:
1334                                print >>cc, "ControlNode: %s" % \
1335                                        self.control_gateway
1336                            print >>cc, "ExperimentID: %s/%s" % \
1337                                    ( tbparams[self.master]['project'], \
1338                                    self.eid )
1339                            cc.close()
1340                        except IOError:
1341                            raise service_error(service_error.internal,
1342                                    "Error creating seer config")
1343                    else:
1344                        debug.error("[gateways]: No control gateway for %s" %\
1345                                    self.current_gateways)
1346                    self.current_gateways = None
1347                else:
1348                    dtb, myname, desthost, type = line.split(" ")
1349
1350                    if type == "control" or type == "both":
1351                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1352                                self.eid, 
1353                                tbparams[self.current_gateways]['project'],
1354                                tbparams[self.current_gateways]['domain'])
1355                    try:
1356                        active = self.gateway_conf_file(self.current_gateways,
1357                                self.master, self.eid, self.gw_pubkey_base,
1358                                self.gw_secretkey_base,
1359                                self.active_end, tbparams, dtb, myname,
1360                                desthost, type)
1361                    except IOError, e:
1362                        raise service_error(service_error.internal,
1363                                "Failed to write config file for %s" % \
1364                                        self.current_gateway)
1365           
1366                    gw_pubkey = "%s/keys/%s" % \
1367                            (self.tmpdir, self.gw_pubkey_base)
1368                    gw_secretkey = "%s/keys/%s" % \
1369                            (self.tmpdir, self.gw_secretkey_base)
1370
1371                    pkfile = "%s/%s/%s" % \
1372                            ( self.tmpdir, self.current_gateways, 
1373                                    self.gw_pubkey_base)
1374                    skfile = "%s/%s/%s" % \
1375                            ( self.tmpdir, self.current_gateways, 
1376                                    self.gw_secretkey_base)
1377
1378                    if not os.path.exists(pkfile):
1379                        try:
1380                            self.copy_file(gw_pubkey, pkfile)
1381                        except IOError:
1382                            service_error(service_error.internal,
1383                                    "Failed to copy pubkey file")
1384
1385                    if active and not os.path.exists(skfile):
1386                        try:
1387                            self.copy_file(gw_secretkey, skfile)
1388                        except IOError:
1389                            service_error(service_error.internal,
1390                                    "Failed to copy secretkey file")
1391                return True
1392
1393    class shunt_to_file:
1394        """
1395        Simple class to write data between two regexps to a file.
1396        """
1397        def __init__(self, begin, end, filename):
1398            """
1399            Begin shunting on a match of begin, stop on end, send data to
1400            filename.
1401            """
1402            self.begin = re.compile(begin)
1403            self.end = re.compile(end)
1404            self.in_shunt = False
1405            self.file = None
1406            self.filename = filename
1407
1408        def __call__(self, line):
1409            """
1410            Call this on each line in the input that may be shunted.
1411            """
1412            if not self.in_shunt:
1413                if self.begin.match(line):
1414                    self.in_shunt = True
1415                    try:
1416                        self.file = open(self.filename, "w")
1417                    except:
1418                        self.file = None
1419                        raise
1420                    return True
1421                else:
1422                    return False
1423            else:
1424                if self.end.match(line):
1425                    if self.file: 
1426                        self.file.close()
1427                        self.file = None
1428                    self.in_shunt = False
1429                else:
1430                    if self.file:
1431                        print >>self.file, line
1432                return True
1433
1434    class shunt_to_list:
1435        """
1436        Same interface as shunt_to_file.  Data collected in self.list, one list
1437        element per line.
1438        """
1439        def __init__(self, begin, end):
1440            self.begin = re.compile(begin)
1441            self.end = re.compile(end)
1442            self.in_shunt = False
1443            self.list = [ ]
1444       
1445        def __call__(self, line):
1446            if not self.in_shunt:
1447                if self.begin.match(line):
1448                    self.in_shunt = True
1449                    return True
1450                else:
1451                    return False
1452            else:
1453                if self.end.match(line):
1454                    self.in_shunt = False
1455                else:
1456                    self.list.append(line)
1457                return True
1458
1459    class shunt_to_string:
1460        """
1461        Same interface as shunt_to_file.  Data collected in self.str, all in
1462        one string.
1463        """
1464        def __init__(self, begin, end):
1465            self.begin = re.compile(begin)
1466            self.end = re.compile(end)
1467            self.in_shunt = False
1468            self.str = ""
1469       
1470        def __call__(self, line):
1471            if not self.in_shunt:
1472                if self.begin.match(line):
1473                    self.in_shunt = True
1474                    return True
1475                else:
1476                    return False
1477            else:
1478                if self.end.match(line):
1479                    self.in_shunt = False
1480                else:
1481                    self.str += line
1482                return True
1483
1484    def create_experiment(self, req, fid):
1485        """
1486        The external interface to experiment creation called from the
1487        dispatcher.
1488
1489        Creates a working directory, splits the incoming description using the
1490        splitter script and parses out the avrious subsections using the
1491        lcasses above.  Once each sub-experiment is created, use pooled threads
1492        to instantiate them and start it all up.
1493        """
1494
1495        if not self.auth.check_attribute(fid, 'create'):
1496            raise service_error(service_error.access, "Create access denied")
1497
1498        try:
1499            tmpdir = tempfile.mkdtemp(prefix="split-")
1500        except IOError:
1501            raise service_error(service_error.internal, "Cannot create tmp dir")
1502
1503        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1504        gw_secretkey_base = "fed.%s" % self.ssh_type
1505        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1506        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1507        tclfile = tmpdir + "/experiment.tcl"
1508        tbparams = { }
1509        try:
1510            access_user = self.accessdb[fid]
1511        except KeyError:
1512            raise service_error(service_error.internal,
1513                    "Access map and authorizer out of sync in " + \
1514                            "create_experiment for fedid %s"  % fid)
1515
1516        pid = "dummy"
1517        gid = "dummy"
1518        # XXX
1519        fail_soft = False
1520
1521        try:
1522            os.mkdir(tmpdir+"/keys")
1523        except OSError:
1524            raise service_error(service_error.internal,
1525                    "Can't make temporary dir")
1526
1527        req = req.get('CreateRequestBody', None)
1528        if not req:
1529            raise service_error(service_error.req,
1530                    "Bad request format (no CreateRequestBody)")
1531        # The tcl parser needs to read a file so put the content into that file
1532        descr=req.get('experimentdescription', None)
1533        if descr:
1534            file_content=descr.get('ns2description', None)
1535            if file_content:
1536                try:
1537                    f = open(tclfile, 'w')
1538                    f.write(file_content)
1539                    f.close()
1540                except IOError:
1541                    raise service_error(service_error.internal,
1542                            "Cannot write temp experiment description")
1543            else:
1544                raise service_error(service_error.req, 
1545                        "Only ns2descriptions supported")
1546        else:
1547            raise service_error(service_error.req, "No experiment description")
1548
1549        if req.has_key('experimentID') and \
1550                req['experimentID'].has_key('localname'):
1551            eid = req['experimentID']['localname']
1552            self.state_lock.acquire()
1553            while (self.state.has_key(eid)):
1554                eid += random.choice(string.ascii_letters)
1555            # To avoid another thread picking this localname
1556            self.state[eid] = "placeholder"
1557            self.state_lock.release()
1558        else:
1559            eid = self.exp_stem
1560            for i in range(0,5):
1561                eid += random.choice(string.ascii_letters)
1562            self.state_lock.acquire()
1563            while (self.state.has_key(eid)):
1564                eid = self.exp_stem
1565                for i in range(0,5):
1566                    eid += random.choice(string.ascii_letters)
1567            # To avoid another thread picking this localname
1568            self.state[eid] = "placeholder"
1569            self.state_lock.release()
1570
1571        try: 
1572            # This catches exceptions to clear the placeholder if necessary
1573            try:
1574                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1575            except ValueError:
1576                raise service_error(service_error.server_config, 
1577                        "Bad key type (%s)" % self.ssh_type)
1578
1579            user = req.get('user', None)
1580            if user == None:
1581                raise service_error(service_error.req, "No user")
1582
1583            master = req.get('master', None)
1584            if not master:
1585                raise service_error(service_error.req,
1586                        "No master testbed label")
1587            export_project = req.get('exportProject', None)
1588            if not export_project:
1589                raise service_error(service_error.req, "No export project")
1590           
1591            if self.splitter_url:
1592                self.log.debug("Calling remote splitter at %s" % \
1593                        self.splitter_url)
1594                split_data = self.remote_splitter(self.splitter_url,
1595                        file_content, master)
1596            else:
1597                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1598                    str(self.muxmax), '-m', master]
1599
1600                if self.fedkit:
1601                    tclcmd.append('-k')
1602
1603                tclcmd.extend([pid, gid, eid, tclfile])
1604
1605                self.log.debug("running local splitter %s", " ".join(tclcmd))
1606                tclparser = Popen(tclcmd, stdout=PIPE)
1607                split_data = tclparser.stdout
1608
1609            allocated = { }         # Testbeds we can access
1610            started = { }           # Testbeds where a sub-experiment started
1611                                # successfully
1612
1613            # Objects to parse the splitter output (defined above)
1614            parse_current_testbed = self.current_testbed(eid, tmpdir,
1615                    self.fedkit)
1616            parse_allbeds = self.allbeds(self.get_access)
1617            parse_gateways = self.gateways(eid, master, tmpdir,
1618                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1619                    self.fedkit)
1620            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1621                        "^#\s+End\s+Vtopo")
1622            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1623                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1624            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1625                    "^#\s+End\s+tarfiles")
1626            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1627                    "^#\s+End\s+rpms")
1628
1629            # Working on the split data
1630            for line in split_data:
1631                line = line.rstrip()
1632                if parse_current_testbed(line, master, allocated, tbparams):
1633                    continue
1634                elif parse_allbeds(line, user, tbparams, master, export_project,
1635                        access_user):
1636                    continue
1637                elif parse_gateways(line, allocated, tbparams):
1638                    continue
1639                elif parse_vtopo(line):
1640                    continue
1641                elif parse_hostnames(line):
1642                    continue
1643                elif parse_tarfiles(line):
1644                    continue
1645                elif parse_rpms(line):
1646                    continue
1647                else:
1648                    raise service_error(service_error.internal, 
1649                            "Bad tcl parse? %s" % line)
1650            # Virtual topology and visualization
1651            vtopo = self.gentopo(parse_vtopo.str)
1652            if not vtopo:
1653                raise service_error(service_error.internal, 
1654                        "Failed to generate virtual topology")
1655
1656            vis = self.genviz(vtopo)
1657            if not vis:
1658                raise service_error(service_error.internal, 
1659                        "Failed to generate visualization")
1660           
1661            # save federant information
1662            for k in allocated.keys():
1663                tbparams[k]['federant'] = {\
1664                        'name': [ { 'localname' : eid} ],\
1665                        'emulab': tbparams[k]['emulab'],\
1666                        'allocID' : tbparams[k]['allocID'],\
1667                        'master' : k == master,\
1668                    }
1669
1670
1671            # Copy tarfiles and rpms needed at remote sites into a staging area
1672            try:
1673                if self.fedkit:
1674                    parse_tarfiles.list.append(self.fedkit)
1675                for t in parse_tarfiles.list:
1676                    if not os.path.exists("%s/tarfiles" % tmpdir):
1677                        os.mkdir("%s/tarfiles" % tmpdir)
1678                    self.copy_file(t, "%s/tarfiles/%s" % \
1679                            (tmpdir, os.path.basename(t)))
1680                for r in parse_rpms.list:
1681                    if not os.path.exists("%s/rpms" % tmpdir):
1682                        os.mkdir("%s/rpms" % tmpdir)
1683                    self.copy_file(r, "%s/rpms/%s" % \
1684                            (tmpdir, os.path.basename(r)))
1685            except IOError, e:
1686                raise service_error(service_error.internal, 
1687                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1688
1689        except service_error, e:
1690            # If something goes wrong in the parse (usually an access error)
1691            # clear the placeholder state.  From here on out the code delays
1692            # exceptions.
1693            self.state_lock.acquire()
1694            del self.state[eid]
1695            self.state_lock.release()
1696            raise e
1697
1698        thread_pool_info = self.thread_pool()
1699        threads = [ ]
1700
1701        for tb in [ k for k in allocated.keys() if k != master]:
1702            # Wait until we have a free slot to start the next testbed load
1703            thread_pool_info.acquire()
1704            while thread_pool_info.started - \
1705                    thread_pool_info.terminated >= self.nthreads:
1706                thread_pool_info.wait()
1707            thread_pool_info.release()
1708
1709            # Create and start a thread to start the segment, and save it to
1710            # get the return value later
1711            t  = self.pooled_thread(target=self.start_segment, 
1712                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1713                    pdata=thread_pool_info, trace_file=self.trace_file)
1714            threads.append(t)
1715            t.start()
1716
1717        # Wait until all finish (the first clause of the while is to make sure
1718        # one starts)
1719        thread_pool_info.acquire()
1720        while thread_pool_info.started == 0 or \
1721                thread_pool_info.started > thread_pool_info.terminated:
1722            thread_pool_info.wait()
1723        thread_pool_info.release()
1724
1725        # If none failed, start the master
1726        failed = [ t.getName() for t in threads if not t.rv ]
1727
1728        if len(failed) == 0:
1729            if not self.start_segment(master, eid, tbparams, tmpdir):
1730                failed.append(master)
1731
1732        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1733        # If one failed clean up, unless fail_soft is set
1734        if failed:
1735            if not fail_soft:
1736                for tb in succeeded:
1737                    self.stop_segment(tb, eid, tbparams)
1738                # Remove the placeholder
1739                self.state_lock.acquire()
1740                del self.state[eid]
1741                self.state_lock.release()
1742
1743                raise service_error(service_error.federant,
1744                    "Swap in failed on %s" % ",".join(failed))
1745        else:
1746            self.log.info("[start_segment]: Experiment %s started" % eid)
1747
1748        # Generate an ID for the experiment (slice) and a certificate that the
1749        # allocator can use to prove they own it.  We'll ship it back through
1750        # the encrypted connection.
1751        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1752
1753        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1754
1755        # Walk up tmpdir, deleting as we go
1756        for path, dirs, files in os.walk(tmpdir, topdown=False):
1757            for f in files:
1758                os.remove(os.path.join(path, f))
1759            for d in dirs:
1760                os.rmdir(os.path.join(path, d))
1761        os.rmdir(tmpdir)
1762
1763        # The deepcopy prevents the allocation ID and other binaries from being
1764        # translated into other formats
1765        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1766                for tb in tbparams.keys() \
1767                    if tbparams[tb].has_key('federant') ],\
1768                    'vtopo': vtopo,\
1769                    'vis' : vis,
1770                    'experimentID' : [\
1771                            { 'fedid': copy.copy(expid) }, \
1772                            { 'localname': eid },\
1773                        ],\
1774                    'experimentAccess': { 'X509' : expcert },\
1775                }
1776
1777        # Insert the experiment into our state and update the disk copy
1778        self.state_lock.acquire()
1779        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1780                for tb in tbparams.keys() \
1781                    if tbparams[tb].has_key('federant') ],\
1782                    'vtopo': vtopo,\
1783                    'vis' : vis,
1784                    'owner': fid,
1785                    'experimentID' : [\
1786                            { 'fedid': expid }, { 'localname': eid },\
1787                        ],\
1788                }
1789        self.state[eid] = self.state[expid]
1790        if self.state_filename: self.write_state()
1791        self.state_lock.release()
1792
1793        self.auth.set_attribute(fid, expid)
1794        self.auth.set_attribute(expid, expid)
1795
1796        if not failed:
1797            return resp
1798        else:
1799            raise service_error(service_error.partial, \
1800                    "Partial swap in on %s" % ",".join(succeeded))
1801
1802    def check_experiment_access(self, fid, key):
1803        """
1804        Confirm that the fid has access to the experiment.  Though a request
1805        may be made in terms of a local name, the access attribute is always
1806        the experiment's fedid.
1807        """
1808        if not isinstance(key, fedid):
1809            self.state_lock.acquire()
1810            if self.state.has_key(key):
1811                try:
1812                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1813                            if f.has_key('fedid') ]
1814                except KeyError:
1815                    self.state_lock.release()
1816                    raise service_error(service_error.internal, 
1817                            "No fedid for experiment %s when checking " +\
1818                                    "access(!?)" % key)
1819                if len(kl) == 1:
1820                    key = kl[0]
1821                else:
1822                    self.state_lock.release()
1823                    raise service_error(service_error.internal, 
1824                            "multiple fedids for experiment %s when " +\
1825                                    "checking access(!?)" % key)
1826            else:
1827                self.state_lock.release()
1828                raise service_error(service_error.access, "Access Denied")
1829            self.state_lock.release()
1830
1831        if self.auth.check_attribute(fid, key):
1832            return True
1833        else:
1834            raise service_error(service_error.access, "Access Denied")
1835
1836
1837
1838    def get_vtopo(self, req, fid):
1839        """
1840        Return the stored virtual topology for this experiment
1841        """
1842        rv = None
1843
1844        req = req.get('VtopoRequestBody', None)
1845        if not req:
1846            raise service_error(service_error.req,
1847                    "Bad request format (no VtopoRequestBody)")
1848        exp = req.get('experiment', None)
1849        if exp:
1850            if exp.has_key('fedid'):
1851                key = exp['fedid']
1852                keytype = "fedid"
1853            elif exp.has_key('localname'):
1854                key = exp['localname']
1855                keytype = "localname"
1856            else:
1857                raise service_error(service_error.req, "Unknown lookup type")
1858        else:
1859            raise service_error(service_error.req, "No request?")
1860
1861        self.check_experiment_access(fid, key)
1862
1863        self.state_lock.acquire()
1864        if self.state.has_key(key):
1865            rv = { 'experiment' : {keytype: key },\
1866                    'vtopo': self.state[key]['vtopo'],\
1867                }
1868        self.state_lock.release()
1869
1870        if rv: return rv
1871        else: raise service_error(service_error.req, "No such experiment")
1872
1873    def get_vis(self, req, fid):
1874        """
1875        Return the stored visualization for this experiment
1876        """
1877        rv = None
1878
1879        req = req.get('VisRequestBody', None)
1880        if not req:
1881            raise service_error(service_error.req,
1882                    "Bad request format (no VisRequestBody)")
1883        exp = req.get('experiment', None)
1884        if exp:
1885            if exp.has_key('fedid'):
1886                key = exp['fedid']
1887                keytype = "fedid"
1888            elif exp.has_key('localname'):
1889                key = exp['localname']
1890                keytype = "localname"
1891            else:
1892                raise service_error(service_error.req, "Unknown lookup type")
1893        else:
1894            raise service_error(service_error.req, "No request?")
1895
1896        self.check_experiment_access(fid, key)
1897
1898        self.state_lock.acquire()
1899        if self.state.has_key(key):
1900            rv =  { 'experiment' : {keytype: key },\
1901                    'vis': self.state[key]['vis'],\
1902                    }
1903        self.state_lock.release()
1904
1905        if rv: return rv
1906        else: raise service_error(service_error.req, "No such experiment")
1907
1908    def get_info(self, req, fid):
1909        """
1910        Return all the stored info about this experiment
1911        """
1912        rv = None
1913
1914        req = req.get('InfoRequestBody', None)
1915        if not req:
1916            raise service_error(service_error.req,
1917                    "Bad request format (no VisRequestBody)")
1918        exp = req.get('experiment', None)
1919        if exp:
1920            if exp.has_key('fedid'):
1921                key = exp['fedid']
1922                keytype = "fedid"
1923            elif exp.has_key('localname'):
1924                key = exp['localname']
1925                keytype = "localname"
1926            else:
1927                raise service_error(service_error.req, "Unknown lookup type")
1928        else:
1929            raise service_error(service_error.req, "No request?")
1930
1931        self.check_experiment_access(fid, key)
1932
1933        # The state may be massaged by the service function that called
1934        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1935        # state.
1936        self.state_lock.acquire()
1937        if self.state.has_key(key):
1938            rv = copy.deepcopy(self.state[key])
1939        self.state_lock.release()
1940
1941        if rv: return rv
1942        else: raise service_error(service_error.req, "No such experiment")
1943
1944
1945    def terminate_experiment(self, req, fid):
1946        """
1947        Swap this experiment out on the federants and delete the shared
1948        information
1949        """
1950        tbparams = { }
1951        req = req.get('TerminateRequestBody', None)
1952        if not req:
1953            raise service_error(service_error.req,
1954                    "Bad request format (no TerminateRequestBody)")
1955        exp = req.get('experiment', None)
1956        if exp:
1957            if exp.has_key('fedid'):
1958                key = exp['fedid']
1959                keytype = "fedid"
1960            elif exp.has_key('localname'):
1961                key = exp['localname']
1962                keytype = "localname"
1963            else:
1964                raise service_error(service_error.req, "Unknown lookup type")
1965        else:
1966            raise service_error(service_error.req, "No request?")
1967
1968        self.check_experiment_access(fid, key)
1969
1970        self.state_lock.acquire()
1971        fed_exp = self.state.get(key, None)
1972
1973        if fed_exp:
1974            # This branch of the conditional holds the lock to generate a
1975            # consistent temporary tbparams variable to deallocate experiments.
1976            # It releases the lock to do the deallocations and reacquires it to
1977            # remove the experiment state when the termination is complete.
1978            ids = []
1979            #  experimentID is a list of dicts that are self-describing
1980            #  identifiers.  This finds all the fedids and localnames - the
1981            #  keys of self.state - and puts them into ids.
1982            for id in fed_exp.get('experimentID', []):
1983                if id.has_key('fedid'): ids.append(id['fedid'])
1984                if id.has_key('localname'): ids.append(id['localname'])
1985
1986            # Construct enough of the tbparams to make the stop_segment calls
1987            # work
1988            for fed in fed_exp['federant']:
1989                try:
1990                    for e in fed['name']:
1991                        eid = e.get('localname', None)
1992                        if eid: break
1993                    else:
1994                        continue
1995
1996                    p = fed['emulab']['project']
1997
1998                    project = p['name']['localname']
1999                    tb = p['testbed']['localname']
2000                    user = p['user'][0]['userID']['localname']
2001
2002                    domain = fed['emulab']['domain']
2003                    host  = "%s%s" % (fed['emulab']['ops'], domain)
2004                    aid = fed['allocID']
2005                except KeyError, e:
2006                    continue
2007                tbparams[tb] = {\
2008                        'user': user,\
2009                        'domain': domain,\
2010                        'project': project,\
2011                        'host': host,\
2012                        'eid': eid,\
2013                        'aid': aid,\
2014                    }
2015            self.state_lock.release()
2016
2017            # Stop everyone.
2018            for tb in tbparams.keys():
2019                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
2020
2021            # release the allocations
2022            for tb in tbparams.keys():
2023                self.release_access(tb, tbparams[tb]['aid'])
2024
2025            # Remove the terminated experiment
2026            self.state_lock.acquire()
2027            for id in ids:
2028                if self.state.has_key(id): del self.state[id]
2029
2030            if self.state_filename: self.write_state()
2031            self.state_lock.release()
2032
2033            return { 'experiment': exp }
2034        else:
2035            # Don't forget to release the lock
2036            self.state_lock.release()
2037            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.