source: fedd/federation/experiment_control.py @ 7ca25b3

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 7ca25b3 was 7ca25b3, checked in by Ted Faber <faber@…>, 15 years ago

Fix deallocation on partial swapin failure

  • Property mode set to 100644
File size: 61.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53
54        def acquire(self):
55            """
56            Get the pool's lock.
57            """
58            self.changed.acquire()
59
60        def release(self):
61            """
62            Release the pool's lock.
63            """
64            self.changed.release()
65
66        def wait(self, timeout = None):
67            """
68            Wait for a pool thread to start or stop.
69            """
70            self.changed.wait(timeout)
71
72        def start(self):
73            """
74            Called by a pool thread to report starting.
75            """
76            self.changed.acquire()
77            self.started += 1
78            self.changed.notifyAll()
79            self.changed.release()
80
81        def terminate(self):
82            """
83            Called by a pool thread to report finishing.
84            """
85            self.changed.acquire()
86            self.terminated += 1
87            self.changed.notifyAll()
88            self.changed.release()
89
90        def clear(self):
91            """
92            Clear all pool data.
93            """
94            self.changed.acquire()
95            self.started = 0
96            self.terminated =0
97            self.changed.notifyAll()
98            self.changed.release()
99
100    class pooled_thread(Thread):
101        """
102        One of a set of threads dedicated to a specific task.  Uses the
103        thread_pool class above for coordination.
104        """
105        def __init__(self, group=None, target=None, name=None, args=(), 
106                kwargs={}, pdata=None, trace_file=None):
107            Thread.__init__(self, group, target, name, args, kwargs)
108            self.rv = None          # Return value of the ops in this thread
109            self.exception = None   # Exception that terminated this thread
110            self.target=target      # Target function to run on start()
111            self.args = args        # Args to pass to target
112            self.kwargs = kwargs    # Additional kw args
113            self.pdata = pdata      # thread_pool for this class
114            # Logger for this thread
115            self.log = logging.getLogger("fedd.experiment_control")
116       
117        def run(self):
118            """
119            Emulate Thread.run, except add pool data manipulation and error
120            logging.
121            """
122            if self.pdata:
123                self.pdata.start()
124
125            if self.target:
126                try:
127                    self.rv = self.target(*self.args, **self.kwargs)
128                except service_error, s:
129                    self.exception = s
130                    self.log.error("Thread exception: %s %s" % \
131                            (s.code_string(), s.desc))
132                except:
133                    self.exception = sys.exc_info()[1]
134                    self.log.error(("Unexpected thread exception: %s" +\
135                            "Trace %s") % (self.exception,\
136                                traceback.format_exc()))
137            if self.pdata:
138                self.pdata.terminate()
139
140    call_RequestAccess = service_caller('RequestAccess')
141    call_ReleaseAccess = service_caller('ReleaseAccess')
142    call_Ns2Split = service_caller('Ns2Split')
143
144    def __init__(self, config=None, auth=None):
145        """
146        Intialize the various attributes, most from the config object
147        """
148        self.thread_with_rv = experiment_control_local.pooled_thread
149        self.thread_pool = experiment_control_local.thread_pool
150
151        self.cert_file = config.get("experiment_control", "cert_file")
152        if self.cert_file:
153            self.cert_pwd = config.get("experiment_control", "cert_pwd")
154        else:
155            self.cert_file = config.get("globals", "cert_file")
156            self.cert_pwd = config.get("globals", "cert_pwd")
157
158        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
159                or config.get("globals", "trusted_certs")
160
161        self.exp_stem = "fed-stem"
162        self.log = logging.getLogger("fedd.experiment_control")
163        set_log_level(config, "experiment_control", self.log)
164        self.muxmax = 2
165        self.nthreads = 2
166        self.randomize_experiments = False
167
168        self.scp_exec = "/usr/bin/scp"
169        self.splitter = None
170        self.ssh_exec="/usr/bin/ssh"
171        self.ssh_keygen = "/usr/bin/ssh-keygen"
172        self.ssh_identity_file = None
173
174
175        self.debug = config.getboolean("experiment_control", "create_debug")
176        self.state_filename = config.get("experiment_control", 
177                "experiment_state_file")
178        self.splitter_url = config.get("experiment_control", "splitter_url")
179        self.fedkit = config.get("experiment_control", "fedkit")
180        accessdb_file = config.get("experiment_control", "accessdb")
181
182        self.ssh_pubkey_file = config.get("experiment_control", 
183                "ssh_pubkey_file")
184        self.ssh_privkey_file = config.get("experiment_control",
185                "ssh_privkey_file")
186        # NB for internal master/slave ops, not experiment setup
187        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
188        self.state = { }
189        self.state_lock = Lock()
190        self.tclsh = "/usr/local/bin/otclsh"
191        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
192                config.get("experiment_control", "tcl_splitter",
193                        "/usr/testbed/lib/ns2ir/parse.tcl")
194        mapdb_file = config.get("experiment_control", "mapdb")
195        self.trace_file = sys.stderr
196
197        self.def_expstart = \
198                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
199                "/tmp/federate";
200        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
201                "FEDDIR/hosts";
202        self.def_gwstart = \
203                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
204                "/tmp/bridge.log";
205        self.def_mgwstart = \
206                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
207                "/tmp/bridge.log";
208        self.def_gwimage = "FBSD61-TUNNEL2";
209        self.def_gwtype = "pc";
210        self.local_access = { }
211
212        if auth:
213            self.auth = auth
214        else:
215            self.log.error(\
216                    "[access]: No authorizer initialized, creating local one.")
217            auth = authorizer()
218
219
220        if self.ssh_pubkey_file:
221            try:
222                f = open(self.ssh_pubkey_file, 'r')
223                self.ssh_pubkey = f.read()
224                f.close()
225            except IOError:
226                raise service_error(service_error.internal,
227                        "Cannot read sshpubkey")
228        else:
229            raise service_error(service_error.internal, 
230                    "No SSH public key file?")
231
232        if not self.ssh_privkey_file:
233            raise service_error(service_error.internal, 
234                    "No SSH public key file?")
235
236
237        if mapdb_file:
238            self.read_mapdb(mapdb_file)
239        else:
240            self.log.warn("[experiment_control] No testbed map, using defaults")
241            self.tbmap = { 
242                    'deter':'https://users.isi.deterlab.net:23235',
243                    'emulab':'https://users.isi.deterlab.net:23236',
244                    'ucb':'https://users.isi.deterlab.net:23237',
245                    }
246
247        if accessdb_file:
248                self.read_accessdb(accessdb_file)
249        else:
250            raise service_error(service_error.internal,
251                    "No accessdb specified in config")
252
253        # Grab saved state.  OK to do this w/o locking because it's read only
254        # and only one thread should be in existence that can see self.state at
255        # this point.
256        if self.state_filename:
257            self.read_state()
258
259        # Dispatch tables
260        self.soap_services = {\
261                'Create': soap_handler('Create', self.create_experiment),
262                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
263                'Vis': soap_handler('Vis', self.get_vis),
264                'Info': soap_handler('Info', self.get_info),
265                'Terminate': soap_handler('Terminate', 
266                    self.terminate_experiment),
267        }
268
269        self.xmlrpc_services = {\
270                'Create': xmlrpc_handler('Create', self.create_experiment),
271                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
272                'Vis': xmlrpc_handler('Vis', self.get_vis),
273                'Info': xmlrpc_handler('Info', self.get_info),
274                'Terminate': xmlrpc_handler('Terminate',
275                    self.terminate_experiment),
276        }
277
278    def copy_file(self, src, dest, size=1024):
279        """
280        Exceedingly simple file copy.
281        """
282        s = open(src,'r')
283        d = open(dest, 'w')
284
285        buf = "x"
286        while buf != "":
287            buf = s.read(size)
288            d.write(buf)
289        s.close()
290        d.close()
291
292    # Call while holding self.state_lock
293    def write_state(self):
294        """
295        Write a new copy of experiment state after copying the existing state
296        to a backup.
297
298        State format is a simple pickling of the state dictionary.
299        """
300        if os.access(self.state_filename, os.W_OK):
301            self.copy_file(self.state_filename, \
302                    "%s.bak" % self.state_filename)
303        try:
304            f = open(self.state_filename, 'w')
305            pickle.dump(self.state, f)
306        except IOError, e:
307            self.log.error("Can't write file %s: %s" % \
308                    (self.state_filename, e))
309        except pickle.PicklingError, e:
310            self.log.error("Pickling problem: %s" % e)
311        except TypeError, e:
312            self.log.error("Pickling problem (TypeError): %s" % e)
313
314    # Call while holding self.state_lock
315    def read_state(self):
316        """
317        Read a new copy of experiment state.  Old state is overwritten.
318
319        State format is a simple pickling of the state dictionary.
320        """
321        try:
322            f = open(self.state_filename, "r")
323            self.state = pickle.load(f)
324            self.log.debug("[read_state]: Read state from %s" % \
325                    self.state_filename)
326        except IOError, e:
327            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
328                    % (self.state_filename, e))
329        except pickle.UnpicklingError, e:
330            self.log.warning(("[read_state]: No saved state: " + \
331                    "Unpickling failed: %s") % e)
332       
333        for k in self.state.keys():
334            try:
335                # This list should only have one element in it, but phrasing it
336                # as a for loop doesn't cost much, really.  We have to find the
337                # fedid elements anyway.
338                for eid in [ f['fedid'] \
339                        for f in self.state[k]['experimentID']\
340                            if f.has_key('fedid') ]:
341                    self.auth.set_attribute(self.state[k]['owner'], eid)
342            except KeyError, e:
343                self.log.warning("[read_state]: State ownership or identity " +\
344                        "misformatted in %s: %s" % (self.state_filename, e))
345
346
347    def read_accessdb(self, accessdb_file):
348        """
349        Read the mapping from fedids that can create experiments to their name
350        in the 3-level access namespace.  All will be asserted from this
351        testbed and can include the local username and porject that will be
352        asserted on their behalf by this fedd.  Each fedid is also added to the
353        authorization system with the "create" attribute.
354        """
355        self.accessdb = {}
356        # These are the regexps for parsing the db
357        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
358        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
359                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
360        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
361                "\s*->\s*(" + name_expr + ")\s*$")
362        lineno = 0
363
364        # Parse the mappings and store in self.authdb, a dict of
365        # fedid -> (proj, user)
366        try:
367            f = open(accessdb_file, "r")
368            for line in f:
369                lineno += 1
370                line = line.strip()
371                if len(line) == 0 or line.startswith('#'):
372                    continue
373                m = project_line.match(line)
374                if m:
375                    fid = fedid(hexstr=m.group(1))
376                    project, user = m.group(2,3)
377                    if not self.accessdb.has_key(fid):
378                        self.accessdb[fid] = []
379                    self.accessdb[fid].append((project, user))
380                    continue
381
382                m = user_line.match(line)
383                if m:
384                    fid = fedid(hexstr=m.group(1))
385                    project = None
386                    user = m.group(2)
387                    if not self.accessdb.has_key(fid):
388                        self.accessdb[fid] = []
389                    self.accessdb[fid].append((project, user))
390                    continue
391                self.log.warn("[experiment_control] Error parsing access " +\
392                        "db %s at line %d" %  (accessdb_file, lineno))
393        except IOError:
394            raise service_error(service_error.internal,
395                    "Error opening/reading %s as experiment " +\
396                            "control accessdb" %  accessdb_file)
397        f.close()
398
399        # Initialize the authorization attributes
400        for fid in self.accessdb.keys():
401            self.auth.set_attribute(fid, 'create')
402
403    def read_mapdb(self, file):
404        """
405        Read a simple colon separated list of mappings for the
406        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
407        """
408
409        self.tbmap = { }
410        lineno =0
411        try:
412            f = open(file, "r")
413            for line in f:
414                lineno += 1
415                line = line.strip()
416                if line.startswith('#') or len(line) == 0:
417                    continue
418                try:
419                    label, url = line.split(':', 1)
420                    self.tbmap[label] = url
421                except ValueError, e:
422                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
423                            "map db: %s %s" % (lineno, line, e))
424        except IOError, e:
425            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
426                    "open %s: %s" % (file, e))
427        f.close()
428
429    def scp_file(self, file, user, host, dest=""):
430        """
431        scp a file to the remote host.  If debug is set the action is only
432        logged.
433        """
434
435        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', '-i', 
436                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
437        rv = 0
438
439        try:
440            dnull = open("/dev/null", "w")
441        except IOError:
442            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
443            dnull = Null
444
445        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
446        if not self.debug:
447            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
448
449        return rv == 0
450
451    def ssh_cmd(self, user, host, cmd, wname=None):
452        """
453        Run a remote command on host as user.  If debug is set, the action is
454        only logged.
455        """
456        sh_str = "%s -o 'IdentitiesOnly yes' -i %s %s@%s %s" % \
457                (self.ssh_exec, self.ssh_privkey_file, 
458                        user, host, cmd)
459
460        try:
461            dnull = open("/dev/null", "r")
462        except IOError:
463            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
464            dnull = Null
465
466        self.log.debug("[ssh_cmd]: %s" % sh_str)
467        if not self.debug:
468            if dnull:
469                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
470            else:
471                sub = Popen(sh_str, shell=True)
472            return sub.wait() == 0
473        else:
474            return True
475
476    def ship_configs(self, host, user, src_dir, dest_dir):
477        """
478        Copy federant-specific configuration files to the federant.
479        """
480        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
481            return False
482        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
483            return False
484
485        for f in os.listdir(src_dir):
486            if os.path.isdir(f):
487                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
488                        "%s/%s" % (dest_dir, f)):
489                    return False
490            else:
491                if not self.scp_file("%s/%s" % (src_dir, f), 
492                        user, host, dest_dir):
493                    return False
494        return True
495
496    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
497        """
498        Start a sub-experiment on a federant.
499
500        Get the current state, modify or create as appropriate, ship data and
501        configs and start the experiment.  There are small ordering differences
502        based on the initial state of the sub-experiment.
503        """
504        # ops node in the federant
505        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
506        user = tbparams[tb]['user']     # federant user
507        pid = tbparams[tb]['project']   # federant project
508        # XXX
509        base_confs = ( "hosts",)
510        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
511        # command to test experiment state
512        expinfo_exec = "/usr/testbed/bin/expinfo" 
513        # Configuration directories on the remote machine
514        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
515        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
516        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
517        # Regular expressions to parse the expinfo response
518        state_re = re.compile("State:\s+(\w+)")
519        no_exp_re = re.compile("^No\s+such\s+experiment")
520        state = None    # Experiment state parsed from expinfo
521        # The expinfo ssh command.  Note the identity restriction to use only
522        # the identity provided in the pubkey given.
523        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-i', 
524                self.ssh_privkey_file, "%s@%s" % (user, host), 
525                expinfo_exec, pid, eid]
526
527        # Get status
528        self.log.debug("[start_segment]: %s"% " ".join(cmd))
529        dev_null = None
530        try:
531            dev_null = open("/dev/null", "a")
532        except IOError, e:
533            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
534
535        if self.debug:
536            state = 'swapped'
537            rv = 0
538        else:
539            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
540            for line in status.stdout:
541                m = state_re.match(line)
542                if m: state = m.group(1)
543                else:
544                    m = no_exp_re.match(line)
545                    if m: state = "none"
546            rv = status.wait()
547
548        # If the experiment is not present the subcommand returns a non-zero
549        # return value.  If we successfully parsed a "none" outcome, ignore the
550        # return code.
551        if rv != 0 and state != "none":
552            raise service_error(service_error.internal,
553                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
554
555        self.log.debug("[start_segment]: %s: %s" % (tb, state))
556        self.log.info("[start_segment]:transferring experiment to %s" % tb)
557
558        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
559            return False
560        # Clear the federation config dirs
561        if not self.ssh_cmd(user, host, 
562                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
563            return False
564        # Clear and create the tarfiles and rpm directories
565        for d in (tarfiles_dir, rpms_dir):
566            if not self.ssh_cmd(user, host, 
567                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
568                return False
569            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
570                    "create tarfiles"):
571                return False
572       
573        if state == 'active':
574            # Create the federation config dirs (do not move outside the
575            # conditional.  Happens later in new expriment creation)
576            if not self.ssh_cmd(user, host, 
577                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
578                return False
579            # Remote experiment is active.  Modify it.
580            for f in base_confs:
581                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
582                        "%s/%s" % (proj_dir, f)):
583                    return False
584            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
585                    proj_dir):
586                return False
587            if os.path.isdir("%s/tarfiles" % tmpdir):
588                if not self.ship_configs(host, user,
589                        "%s/tarfiles" % tmpdir, tarfiles_dir):
590                    return False
591            if os.path.isdir("%s/rpms" % tmpdir):
592                if not self.ship_configs(host, user,
593                        "%s/rpms" % tmpdir, tarfiles_dir):
594                    return False
595            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
596            if not self.ssh_cmd(user, host,
597                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
598                            (pid, eid, tclfile), "modexp"):
599                return False
600            return True
601        elif state == "swapped":
602            # Create the federation config dirs (do not move outside the
603            # conditional.  Happens later in new expriment creation)
604            if not self.ssh_cmd(user, host, 
605                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
606                return False
607            # Remote experiment swapped out.  Modify it and swap it in.
608            for f in base_confs:
609                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
610                        "%s/%s" % (proj_dir, f)):
611                    return False
612            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
613                    proj_dir):
614                return False
615            if os.path.isdir("%s/tarfiles" % tmpdir):
616                if not self.ship_configs(host, user,
617                        "%s/tarfiles" % tmpdir, tarfiles_dir):
618                    return False
619            if os.path.isdir("%s/rpms" % tmpdir):
620                if not self.ship_configs(host, user,
621                        "%s/rpms" % tmpdir, tarfiles_dir):
622                    return False
623            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
624            if not self.ssh_cmd(user, host,
625                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
626                    "modexp"):
627                return False
628            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
629            if not self.ssh_cmd(user, host,
630                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
631                    "swapexp"):
632                return False
633            return True
634        elif state == "none":
635            # No remote experiment.  Create one.  We do this in 2 steps so we
636            # can put the configuration files and scripts into the new
637            # experiment directories.
638
639            # Tarfiles must be present for creation to work
640            if os.path.isdir("%s/tarfiles" % tmpdir):
641                if not self.ship_configs(host, user,
642                        "%s/tarfiles" % tmpdir, tarfiles_dir):
643                    return False
644            if os.path.isdir("%s/rpms" % tmpdir):
645                if not self.ship_configs(host, user,
646                        "%s/rpms" % tmpdir, tarfiles_dir):
647                    return False
648            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
649            if not self.ssh_cmd(user, host,
650                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
651                            (pid, eid, tclfile), "startexp"):
652                return False
653            # Create the federation config dirs (do not move outside the
654            # conditional.)
655            if not self.ssh_cmd(user, host, 
656                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
657                return False
658            # After startexp the per-experiment directories exist
659            for f in base_confs:
660                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
661                        "%s/%s" % (proj_dir, f)):
662                    return False
663            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
664                    proj_dir):
665                return False
666            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
667            if not self.ssh_cmd(user, host,
668                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
669                    "swapexp"):
670                return False
671            return True
672        else:
673            self.log.debug("[start_segment]:unknown state %s" % state)
674            return False
675
676    def stop_segment(self, tb, eid, tbparams):
677        """
678        Stop a sub experiment by calling swapexp on the federant
679        """
680        user = tbparams[tb]['user']
681        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
682        pid = tbparams[tb]['project']
683
684        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
685        return self.ssh_cmd(user, host,
686                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
687
688       
689    def generate_ssh_keys(self, dest, type="rsa" ):
690        """
691        Generate a set of keys for the gateways to use to talk.
692
693        Keys are of type type and are stored in the required dest file.
694        """
695        valid_types = ("rsa", "dsa")
696        t = type.lower();
697        if t not in valid_types: raise ValueError
698        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
699
700        try:
701            trace = open("/dev/null", "w")
702        except IOError:
703            raise service_error(service_error.internal,
704                    "Cannot open /dev/null??");
705
706        # May raise CalledProcessError
707        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
708        rv = call(cmd, stdout=trace, stderr=trace)
709        if rv != 0:
710            raise service_error(service_error.internal, 
711                    "Cannot generate nonce ssh keys.  %s return code %d" \
712                            % (self.ssh_keygen, rv))
713
714    def gentopo(self, str):
715        """
716        Generate the topology dtat structure from the splitter's XML
717        representation of it.
718
719        The topology XML looks like:
720            <experiment>
721                <nodes>
722                    <node><vname></vname><ips>ip1:ip2</ips></node>
723                </nodes>
724                <lans>
725                    <lan>
726                        <vname></vname><vnode></vnode><ip></ip>
727                        <bandwidth></bandwidth><member>node:port</member>
728                    </lan>
729                </lans>
730        """
731        class topo_parse:
732            """
733            Parse the topology XML and create the dats structure.
734            """
735            def __init__(self):
736                # Typing of the subelements for data conversion
737                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
738                self.int_subelements = ( 'bandwidth',)
739                self.float_subelements = ( 'delay',)
740                # The final data structure
741                self.nodes = [ ]
742                self.lans =  [ ]
743                self.topo = { \
744                        'node': self.nodes,\
745                        'lan' : self.lans,\
746                    }
747                self.element = { }  # Current element being created
748                self.chars = ""     # Last text seen
749
750            def end_element(self, name):
751                # After each sub element the contents is added to the current
752                # element or to the appropriate list.
753                if name == 'node':
754                    self.nodes.append(self.element)
755                    self.element = { }
756                elif name == 'lan':
757                    self.lans.append(self.element)
758                    self.element = { }
759                elif name in self.str_subelements:
760                    self.element[name] = self.chars
761                    self.chars = ""
762                elif name in self.int_subelements:
763                    self.element[name] = int(self.chars)
764                    self.chars = ""
765                elif name in self.float_subelements:
766                    self.element[name] = float(self.chars)
767                    self.chars = ""
768
769            def found_chars(self, data):
770                self.chars += data.rstrip()
771
772
773        tp = topo_parse();
774        parser = xml.parsers.expat.ParserCreate()
775        parser.EndElementHandler = tp.end_element
776        parser.CharacterDataHandler = tp.found_chars
777
778        parser.Parse(str)
779
780        return tp.topo
781       
782
783    def genviz(self, topo):
784        """
785        Generate the visualization the virtual topology
786        """
787
788        neato = "/usr/local/bin/neato"
789        # These are used to parse neato output and to create the visualization
790        # file.
791        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
792        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
793                "%s</type></node>"
794
795        try:
796            # Node names
797            nodes = [ n['vname'] for n in topo['node'] ]
798            topo_lans = topo['lan']
799        except KeyError:
800            raise service_error(service_error.internal, "Bad topology")
801
802        lans = { }
803        links = { }
804
805        # Walk through the virtual topology, organizing the connections into
806        # 2-node connections (links) and more-than-2-node connections (lans).
807        # When a lan is created, it's added to the list of nodes (there's a
808        # node in the visualization for the lan).
809        for l in topo_lans:
810            if links.has_key(l['vname']):
811                if len(links[l['vname']]) < 2:
812                    links[l['vname']].append(l['vnode'])
813                else:
814                    nodes.append(l['vname'])
815                    lans[l['vname']] = links[l['vname']]
816                    del links[l['vname']]
817                    lans[l['vname']].append(l['vnode'])
818            elif lans.has_key(l['vname']):
819                lans[l['vname']].append(l['vnode'])
820            else:
821                links[l['vname']] = [ l['vnode'] ]
822
823
824        # Open up a temporary file for dot to turn into a visualization
825        try:
826            df, dotname = tempfile.mkstemp()
827            dotfile = os.fdopen(df, 'w')
828        except IOError:
829            raise service_error(service_error.internal,
830                    "Failed to open file in genviz")
831
832        # Generate a dot/neato input file from the links, nodes and lans
833        try:
834            print >>dotfile, "graph G {"
835            for n in nodes:
836                print >>dotfile, '\t"%s"' % n
837            for l in links.keys():
838                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
839            for l in lans.keys():
840                for n in lans[l]:
841                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
842            print >>dotfile, "}"
843            dotfile.close()
844        except TypeError:
845            raise service_error(service_error.internal,
846                    "Single endpoint link in vtopo")
847        except IOError:
848            raise service_error(service_error.internal, "Cannot write dot file")
849
850        # Use dot to create a visualization
851        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
852                '-Gpack=true', dotname], stdout=PIPE)
853
854        # Translate dot to vis format
855        vis_nodes = [ ]
856        vis = { 'node': vis_nodes }
857        for line in dot.stdout:
858            m = vis_re.match(line)
859            if m:
860                vn = m.group(1)
861                vis_node = {'name': vn, \
862                        'x': float(m.group(2)),\
863                        'y' : float(m.group(3)),\
864                    }
865                if vn in links.keys() or vn in lans.keys():
866                    vis_node['type'] = 'lan'
867                else:
868                    vis_node['type'] = 'node'
869                vis_nodes.append(vis_node)
870        rv = dot.wait()
871
872        os.remove(dotname)
873        if rv == 0 : return vis
874        else: return None
875
876    def get_access(self, tb, nodes, user, tbparam, master, export_project,
877            access_user):
878        """
879        Get access to testbed through fedd and set the parameters for that tb
880        """
881
882        # XXX This is needless complexity.  It must vanish.
883        translate_attr = {
884            'slavenodestartcmd': 'expstart',
885            'slaveconnectorstartcmd': 'gwstart',
886            'masternodestartcmd': 'mexpstart',
887            'masterconnectorstartcmd': 'mgwstart',
888            'connectorimage': 'gwimage',
889            'connectortype': 'gwtype',
890            'tunnelcfg': 'tun',
891            'tunnelinterface': 'tunnelinterface',
892            'smbshare': 'smbshare',
893        }
894
895        uri = self.tbmap.get(tb, None)
896        if not uri:
897            raise service_error(serice_error.server_config, 
898                    "Unknown testbed: %s" % tb)
899
900        # currently this lumps all users into one service access group
901        service_keys = [ a for u in user \
902                for a in u.get('access', []) \
903                    if a.has_key('sshPubkey')]
904
905        if len(service_keys) == 0:
906            raise service_error(service_error.req, 
907                    "Must have at least one SSH pubkey for services")
908
909
910        for p, u in access_user:
911            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
912                    "to %s") %  ((p or "None"), u, uri))
913
914            if p:
915                # Request with user and project specified
916                req = {\
917                        'destinationTestbed' : { 'uri' : uri },
918                        'project': { 
919                            'name': {'localname': p},
920                            'user': [ {'userID': { 'localname': u } } ],
921                            },
922                        'user':  user,
923                        'allocID' : { 'localname': 'test' },
924                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
925                        'serviceAccess' : service_keys
926                    }
927            else:
928                # Request with only user specified
929                req = {\
930                        'destinationTestbed' : { 'uri' : uri },
931                        'user':  [ {'userID': { 'localname': u } } ],
932                        'allocID' : { 'localname': 'test' },
933                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
934                        'serviceAccess' : service_keys
935                    }
936
937            if tb == master:
938                # NB, the export_project parameter is a dict that includes
939                # the type
940                req['exportProject'] = export_project
941
942            # node resources if any
943            if nodes != None and len(nodes) > 0:
944                rnodes = [ ]
945                for n in nodes:
946                    rn = { }
947                    image, hw, count = n.split(":")
948                    if image: rn['image'] = [ image ]
949                    if hw: rn['hardware'] = [ hw ]
950                    if count and int(count) >0 : rn['count'] = int(count)
951                    rnodes.append(rn)
952                req['resources']= { }
953                req['resources']['node'] = rnodes
954
955            try:
956                if self.local_access.has_key(uri):
957                    # Local access call
958                    req = { 'RequestAccessRequestBody' : req }
959                    r = self.local_access[uri].RequestAccess(req, 
960                            fedid(file=self.cert_file))
961                    r = { 'RequestAccessResponseBody' : r }
962                else:
963                    r = self.call_RequestAccess(uri, req, 
964                            self.cert_file, self.cert_pwd, self.trusted_certs)
965            except service_error, e:
966                if e.code == service_error.access:
967                    self.log.debug("[get_access] Access denied")
968                    r = None
969                    continue
970                else:
971                    raise e
972
973            if r.has_key('RequestAccessResponseBody'):
974                # Through to here we have a valid response, not a fault.
975                # Access denied is a fault, so something better or worse than
976                # access denied has happened.
977                r = r['RequestAccessResponseBody']
978                self.log.debug("[get_access] Access granted")
979                break
980            else:
981                raise service_error(service_error.protocol,
982                        "Bad proxy response")
983       
984        if not r:
985            raise service_error(service_error.access, 
986                    "Access denied by %s (%s)" % (tb, uri))
987
988        e = r['emulab']
989        p = e['project']
990        tbparam[tb] = { 
991                "boss": e['boss'],
992                "host": e['ops'],
993                "domain": e['domain'],
994                "fs": e['fileServer'],
995                "eventserver": e['eventServer'],
996                "project": unpack_id(p['name']),
997                "emulab" : e,
998                "allocID" : r['allocID'],
999                }
1000        # Make the testbed name be the label the user applied
1001        p['testbed'] = {'localname': tb }
1002
1003        for u in p['user']:
1004            tbparam[tb]['user'] = unpack_id(u['userID'])
1005
1006        for a in e['fedAttr']:
1007            if a['attribute']:
1008                key = translate_attr.get(a['attribute'].lower(), None)
1009                if key:
1010                    tbparam[tb][key]= a['value']
1011       
1012    def release_access(self, tb, aid):
1013        """
1014        Release access to testbed through fedd
1015        """
1016
1017        uri = self.tbmap.get(tb, None)
1018        if not uri:
1019            raise service_error(serice_error.server_config, 
1020                    "Unknown testbed: %s" % tb)
1021
1022        if self.local_access.has_key(uri):
1023            resp = self.local_access[uri].ReleaseAccess(\
1024                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1025                    fedid(file=self.cert_file))
1026            resp = { 'ReleaseAccessResponseBody': resp } 
1027        else:
1028            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1029                    self.cert_file, self.cert_pwd, self.trusted_certs)
1030
1031        # better error coding
1032
1033    def remote_splitter(self, uri, desc, master):
1034
1035        req = {
1036                'description' : { 'ns2description': desc },
1037                'master': master,
1038                'include_fedkit': bool(self.fedkit)
1039            }
1040
1041        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1042                self.trusted_certs)
1043
1044        if r.has_key('Ns2SplitResponseBody'):
1045            r = r['Ns2SplitResponseBody']
1046            if r.has_key('output'):
1047                return r['output'].splitlines()
1048            else:
1049                raise service_error(service_error.protocol, 
1050                        "Bad splitter response (no output)")
1051        else:
1052            raise service_error(service_error.protocol, "Bad splitter response")
1053       
1054    class current_testbed:
1055        """
1056        Object for collecting the current testbed description.  The testbed
1057        description is saved to a file with the local testbed variables
1058        subsittuted line by line.
1059        """
1060        def __init__(self, eid, tmpdir, fedkit):
1061            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1062            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1063            self.current_testbed = None
1064            self.testbed_file = None
1065
1066            self.def_expstart = \
1067                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1068            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1069            self.def_gwstart = \
1070                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1071            self.def_mgwstart = \
1072                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1073            self.def_gwimage = "FBSD61-TUNNEL2";
1074            self.def_gwtype = "pc";
1075
1076            self.eid = eid
1077            self.tmpdir = tmpdir
1078            self.fedkit = fedkit
1079
1080        def __call__(self, line, master, allocated, tbparams):
1081            # Capture testbed topology descriptions
1082            if self.current_testbed == None:
1083                m = self.begin_testbed.match(line)
1084                if m != None:
1085                    self.current_testbed = m.group(1)
1086                    if self.current_testbed == None:
1087                        raise service_error(service_error.req,
1088                                "Bad request format (unnamed testbed)")
1089                    allocated[self.current_testbed] = \
1090                            allocated.get(self.current_testbed,0) + 1
1091                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1092                    if not os.path.exists(tb_dir):
1093                        try:
1094                            os.mkdir(tb_dir)
1095                        except IOError:
1096                            raise service_error(service_error.internal,
1097                                    "Cannot create %s" % tb_dir)
1098                    try:
1099                        self.testbed_file = open("%s/%s.%s.tcl" %
1100                                (tb_dir, self.eid, self.current_testbed), 'w')
1101                    except IOError:
1102                        self.testbed_file = None
1103                    return True
1104                else: return False
1105            else:
1106                m = self.end_testbed.match(line)
1107                if m != None:
1108                    if m.group(1) != self.current_testbed:
1109                        raise service_error(service_error.internal, 
1110                                "Mismatched testbed markers!?")
1111                    if self.testbed_file != None: 
1112                        self.testbed_file.close()
1113                        self.testbed_file = None
1114                    self.current_testbed = None
1115                elif self.testbed_file:
1116                    # Substitute variables and put the line into the local
1117                    # testbed file.
1118                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1119                            self.def_gwtype)
1120                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1121                            self.def_gwimage)
1122                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1123                            self.def_mgwstart)
1124                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1125                            self.def_mexpstart)
1126                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1127                            self.def_gwstart)
1128                    expstart = tbparams[self.current_testbed].get('expstart', 
1129                            self.def_expstart)
1130                    project = tbparams[self.current_testbed].get('project')
1131                    line = re.sub("GWTYPE", gwtype, line)
1132                    line = re.sub("GWIMAGE", gwimage, line)
1133                    if self.current_testbed == master:
1134                        line = re.sub("GWSTART", mgwstart, line)
1135                        line = re.sub("EXPSTART", mexpstart, line)
1136                    else:
1137                        line = re.sub("GWSTART", gwstart, line)
1138                        line = re.sub("EXPSTART", expstart, line)
1139                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1140                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1141                    line = re.sub("EID", self.eid, line)
1142                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1143                            (project, self.eid), line)
1144                    if self.fedkit:
1145                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1146                                line)
1147                    print >>self.testbed_file, line
1148                return True
1149
1150    class allbeds:
1151        """
1152        Process the Allbeds section.  Get access to each federant and save the
1153        parameters in tbparams
1154        """
1155        def __init__(self, get_access):
1156            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1157            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1158            self.in_allbeds = False
1159            self.get_access = get_access
1160
1161        def __call__(self, line, user, tbparams, master, export_project,
1162                access_user):
1163            # Testbed access parameters
1164            if not self.in_allbeds:
1165                if self.begin_allbeds.match(line):
1166                    self.in_allbeds = True
1167                    return True
1168                else:
1169                    return False
1170            else:
1171                if self.end_allbeds.match(line):
1172                    self.in_allbeds = False
1173                else:
1174                    nodes = line.split('|')
1175                    tb = nodes.pop(0)
1176                    self.get_access(tb, nodes, user, tbparams, master,
1177                            export_project, access_user)
1178                return True
1179
1180    class gateways:
1181        def __init__(self, eid, master, tmpdir, gw_pubkey,
1182                gw_secretkey, copy_file, fedkit):
1183            self.begin_gateways = \
1184                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1185            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1186            self.current_gateways = None
1187            self.control_gateway = None
1188            self.active_end = { }
1189
1190            self.eid = eid
1191            self.master = master
1192            self.tmpdir = tmpdir
1193            self.gw_pubkey_base = gw_pubkey
1194            self.gw_secretkey_base = gw_secretkey
1195
1196            self.copy_file = copy_file
1197            self.fedkit = fedkit
1198
1199
1200        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1201                active_end, tbparams, dtb, myname, desthost, type):
1202            """
1203            Produce a gateway configuration file from a gateways line.
1204            """
1205
1206            sproject = tbparams[gw].get('project', 'project')
1207            dproject = tbparams[dtb].get('project', 'project')
1208            sdomain = ".%s.%s%s" % (eid, sproject,
1209                    tbparams[gw].get('domain', ".example.com"))
1210            ddomain = ".%s.%s%s" % (eid, dproject,
1211                    tbparams[dtb].get('domain', ".example.com"))
1212            boss = tbparams[master].get('boss', "boss")
1213            fs = tbparams[master].get('fs', "fs")
1214            event_server = "%s%s" % \
1215                    (tbparams[gw].get('eventserver', "event_server"),
1216                            tbparams[gw].get('domain', "example.com"))
1217            remote_event_server = "%s%s" % \
1218                    (tbparams[dtb].get('eventserver', "event_server"),
1219                            tbparams[dtb].get('domain', "example.com"))
1220            seer_control = "%s%s" % \
1221                    (tbparams[gw].get('control', "control"), sdomain)
1222            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1223
1224            if self.fedkit:
1225                remote_script_dir = "/usr/local/federation/bin"
1226                local_script_dir = "/usr/local/federation/bin"
1227            else:
1228                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1229                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1230
1231            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1232            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1233            tunnel_cfg = tbparams[gw].get("tun", "false")
1234
1235            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1236            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1237
1238            # translate to lower case so the `hostname` hack for specifying
1239            # configuration files works.
1240            conf_file = conf_file.lower();
1241            remote_conf_file = remote_conf_file.lower();
1242
1243            if dtb == master:
1244                active = "false"
1245            elif gw == master:
1246                active = "true"
1247            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1248                active = "false"
1249            else:
1250                active_end['%s-%s' % (gw, dtb)] = 1
1251                active = "true"
1252
1253            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1254            print >>gwconfig, "Active: %s" % active
1255            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1256            if tunnel_iface:
1257                print >>gwconfig, "Interface: %s" % tunnel_iface
1258            print >>gwconfig, "BossName: %s" % boss
1259            print >>gwconfig, "FsName: %s" % fs
1260            print >>gwconfig, "EventServerName: %s" % event_server
1261            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1262            print >>gwconfig, "SeerControl: %s" % seer_control
1263            print >>gwconfig, "Type: %s" % type
1264            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1265            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1266                    local_script_dir
1267            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1268            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1269            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1270                    (remote_conf_dir, remote_conf_file)
1271            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1272            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1273            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1274            gwconfig.close()
1275
1276            return active == "true"
1277
1278        def __call__(self, line, allocated, tbparams):
1279            # Process gateways
1280            if not self.current_gateways:
1281                m = self.begin_gateways.match(line)
1282                if m:
1283                    self.current_gateways = m.group(1)
1284                    if allocated.has_key(self.current_gateways):
1285                        # This test should always succeed
1286                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1287                        if not os.path.exists(tb_dir):
1288                            try:
1289                                os.mkdir(tb_dir)
1290                            except IOError:
1291                                raise service_error(service_error.internal,
1292                                        "Cannot create %s" % tb_dir)
1293                    else:
1294                        # XXX
1295                        self.log.error("[gateways]: Ignoring gateways for " + \
1296                                "unknown testbed %s" % self.current_gateways)
1297                        self.current_gateways = None
1298                    return True
1299                else:
1300                    return False
1301            else:
1302                m = self.end_gateways.match(line)
1303                if m :
1304                    if m.group(1) != self.current_gateways:
1305                        raise service_error(service_error.internal,
1306                                "Mismatched gateway markers!?")
1307                    if self.control_gateway:
1308                        try:
1309                            cc = open("%s/%s/client.conf" %
1310                                    (self.tmpdir, self.current_gateways), 'w')
1311                            print >>cc, "ControlGateway: %s" % \
1312                                    self.control_gateway
1313                            if tbparams[self.master].has_key('smbshare'):
1314                                print >>cc, "SMBSHare: %s" % \
1315                                        tbparams[self.master]['smbshare']
1316                            print >>cc, "ProjectUser: %s" % \
1317                                    tbparams[self.master]['user']
1318                            print >>cc, "ProjectName: %s" % \
1319                                    tbparams[self.master]['project']
1320                            cc.close()
1321                        except IOError:
1322                            raise service_error(service_error.internal,
1323                                    "Error creating client config")
1324                        try:
1325                            cc = open("%s/%s/seer.conf" %
1326                                    (self.tmpdir, self.current_gateways),
1327                                    'w')
1328                            if self.current_gateways != self.master:
1329                                print >>cc, "ControlNode: %s" % \
1330                                        self.control_gateway
1331                            print >>cc, "ExperimentID: %s/%s" % \
1332                                    ( tbparams[self.master]['project'], \
1333                                    self.eid )
1334                            cc.close()
1335                        except IOError:
1336                            raise service_error(service_error.internal,
1337                                    "Error creating seer config")
1338                    else:
1339                        debug.error("[gateways]: No control gateway for %s" %\
1340                                    self.current_gateways)
1341                    self.current_gateways = None
1342                else:
1343                    dtb, myname, desthost, type = line.split(" ")
1344
1345                    if type == "control" or type == "both":
1346                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1347                                self.eid, 
1348                                tbparams[self.current_gateways]['project'],
1349                                tbparams[self.current_gateways]['domain'])
1350                    try:
1351                        active = self.gateway_conf_file(self.current_gateways,
1352                                self.master, self.eid, self.gw_pubkey_base,
1353                                self.gw_secretkey_base,
1354                                self.active_end, tbparams, dtb, myname,
1355                                desthost, type)
1356                    except IOError, e:
1357                        raise service_error(service_error.internal,
1358                                "Failed to write config file for %s" % \
1359                                        self.current_gateway)
1360           
1361                    gw_pubkey = "%s/keys/%s" % \
1362                            (self.tmpdir, self.gw_pubkey_base)
1363                    gw_secretkey = "%s/keys/%s" % \
1364                            (self.tmpdir, self.gw_secretkey_base)
1365
1366                    pkfile = "%s/%s/%s" % \
1367                            ( self.tmpdir, self.current_gateways, 
1368                                    self.gw_pubkey_base)
1369                    skfile = "%s/%s/%s" % \
1370                            ( self.tmpdir, self.current_gateways, 
1371                                    self.gw_secretkey_base)
1372
1373                    if not os.path.exists(pkfile):
1374                        try:
1375                            self.copy_file(gw_pubkey, pkfile)
1376                        except IOError:
1377                            service_error(service_error.internal,
1378                                    "Failed to copy pubkey file")
1379
1380                    if active and not os.path.exists(skfile):
1381                        try:
1382                            self.copy_file(gw_secretkey, skfile)
1383                        except IOError:
1384                            service_error(service_error.internal,
1385                                    "Failed to copy secretkey file")
1386                return True
1387
1388    class shunt_to_file:
1389        """
1390        Simple class to write data between two regexps to a file.
1391        """
1392        def __init__(self, begin, end, filename):
1393            """
1394            Begin shunting on a match of begin, stop on end, send data to
1395            filename.
1396            """
1397            self.begin = re.compile(begin)
1398            self.end = re.compile(end)
1399            self.in_shunt = False
1400            self.file = None
1401            self.filename = filename
1402
1403        def __call__(self, line):
1404            """
1405            Call this on each line in the input that may be shunted.
1406            """
1407            if not self.in_shunt:
1408                if self.begin.match(line):
1409                    self.in_shunt = True
1410                    try:
1411                        self.file = open(self.filename, "w")
1412                    except:
1413                        self.file = None
1414                        raise
1415                    return True
1416                else:
1417                    return False
1418            else:
1419                if self.end.match(line):
1420                    if self.file: 
1421                        self.file.close()
1422                        self.file = None
1423                    self.in_shunt = False
1424                else:
1425                    if self.file:
1426                        print >>self.file, line
1427                return True
1428
1429    class shunt_to_list:
1430        """
1431        Same interface as shunt_to_file.  Data collected in self.list, one list
1432        element per line.
1433        """
1434        def __init__(self, begin, end):
1435            self.begin = re.compile(begin)
1436            self.end = re.compile(end)
1437            self.in_shunt = False
1438            self.list = [ ]
1439       
1440        def __call__(self, line):
1441            if not self.in_shunt:
1442                if self.begin.match(line):
1443                    self.in_shunt = True
1444                    return True
1445                else:
1446                    return False
1447            else:
1448                if self.end.match(line):
1449                    self.in_shunt = False
1450                else:
1451                    self.list.append(line)
1452                return True
1453
1454    class shunt_to_string:
1455        """
1456        Same interface as shunt_to_file.  Data collected in self.str, all in
1457        one string.
1458        """
1459        def __init__(self, begin, end):
1460            self.begin = re.compile(begin)
1461            self.end = re.compile(end)
1462            self.in_shunt = False
1463            self.str = ""
1464       
1465        def __call__(self, line):
1466            if not self.in_shunt:
1467                if self.begin.match(line):
1468                    self.in_shunt = True
1469                    return True
1470                else:
1471                    return False
1472            else:
1473                if self.end.match(line):
1474                    self.in_shunt = False
1475                else:
1476                    self.str += line
1477                return True
1478
1479    def create_experiment(self, req, fid):
1480        """
1481        The external interface to experiment creation called from the
1482        dispatcher.
1483
1484        Creates a working directory, splits the incoming description using the
1485        splitter script and parses out the avrious subsections using the
1486        lcasses above.  Once each sub-experiment is created, use pooled threads
1487        to instantiate them and start it all up.
1488        """
1489
1490        if not self.auth.check_attribute(fid, 'create'):
1491            raise service_error(service_error.access, "Create access denied")
1492
1493        try:
1494            tmpdir = tempfile.mkdtemp(prefix="split-")
1495        except IOError:
1496            raise service_error(service_error.internal, "Cannot create tmp dir")
1497
1498        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1499        gw_secretkey_base = "fed.%s" % self.ssh_type
1500        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1501        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1502        tclfile = tmpdir + "/experiment.tcl"
1503        tbparams = { }
1504        try:
1505            access_user = self.accessdb[fid]
1506        except KeyError:
1507            raise service_error(service_error.internal,
1508                    "Access map and authorizer out of sync in " + \
1509                            "create_experiment for fedid %s"  % fid)
1510
1511        pid = "dummy"
1512        gid = "dummy"
1513        # XXX
1514        fail_soft = False
1515
1516        try:
1517            os.mkdir(tmpdir+"/keys")
1518        except OSError:
1519            raise service_error(service_error.internal,
1520                    "Can't make temporary dir")
1521
1522        req = req.get('CreateRequestBody', None)
1523        if not req:
1524            raise service_error(service_error.req,
1525                    "Bad request format (no CreateRequestBody)")
1526        # The tcl parser needs to read a file so put the content into that file
1527        descr=req.get('experimentdescription', None)
1528        if descr:
1529            file_content=descr.get('ns2description', None)
1530            if file_content:
1531                try:
1532                    f = open(tclfile, 'w')
1533                    f.write(file_content)
1534                    f.close()
1535                except IOError:
1536                    raise service_error(service_error.internal,
1537                            "Cannot write temp experiment description")
1538            else:
1539                raise service_error(service_error.req, 
1540                        "Only ns2descriptions supported")
1541        else:
1542            raise service_error(service_error.req, "No experiment description")
1543
1544        if req.has_key('experimentID') and \
1545                req['experimentID'].has_key('localname'):
1546            eid = req['experimentID']['localname']
1547            self.state_lock.acquire()
1548            while (self.state.has_key(eid)):
1549                eid += random.choice(string.ascii_letters)
1550            # To avoid another thread picking this localname
1551            self.state[eid] = "placeholder"
1552            self.state_lock.release()
1553        else:
1554            eid = self.exp_stem
1555            for i in range(0,5):
1556                eid += random.choice(string.ascii_letters)
1557            self.state_lock.acquire()
1558            while (self.state.has_key(eid)):
1559                eid = self.exp_stem
1560                for i in range(0,5):
1561                    eid += random.choice(string.ascii_letters)
1562            # To avoid another thread picking this localname
1563            self.state[eid] = "placeholder"
1564            self.state_lock.release()
1565
1566        try: 
1567            # This catches exceptions to clear the placeholder if necessary
1568            try:
1569                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1570            except ValueError:
1571                raise service_error(service_error.server_config, 
1572                        "Bad key type (%s)" % self.ssh_type)
1573
1574            user = req.get('user', None)
1575            if user == None:
1576                raise service_error(service_error.req, "No user")
1577
1578            master = req.get('master', None)
1579            if not master:
1580                raise service_error(service_error.req,
1581                        "No master testbed label")
1582            export_project = req.get('exportProject', None)
1583            if not export_project:
1584                raise service_error(service_error.req, "No export project")
1585           
1586            if self.splitter_url:
1587                self.log.debug("Calling remote splitter at %s" % \
1588                        self.splitter_url)
1589                split_data = self.remote_splitter(self.splitter_url,
1590                        file_content, master)
1591            else:
1592                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1593                    str(self.muxmax), '-m', master]
1594
1595                if self.fedkit:
1596                    tclcmd.append('-k')
1597
1598                tclcmd.extend([pid, gid, eid, tclfile])
1599
1600                self.log.debug("running local splitter %s", " ".join(tclcmd))
1601                tclparser = Popen(tclcmd, stdout=PIPE)
1602                split_data = tclparser.stdout
1603
1604            allocated = { }         # Testbeds we can access
1605            started = { }           # Testbeds where a sub-experiment started
1606                                # successfully
1607
1608            # Objects to parse the splitter output (defined above)
1609            parse_current_testbed = self.current_testbed(eid, tmpdir,
1610                    self.fedkit)
1611            parse_allbeds = self.allbeds(self.get_access)
1612            parse_gateways = self.gateways(eid, master, tmpdir,
1613                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1614                    self.fedkit)
1615            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1616                        "^#\s+End\s+Vtopo")
1617            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1618                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1619            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1620                    "^#\s+End\s+tarfiles")
1621            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1622                    "^#\s+End\s+rpms")
1623
1624            # Working on the split data
1625            for line in split_data:
1626                line = line.rstrip()
1627                if parse_current_testbed(line, master, allocated, tbparams):
1628                    continue
1629                elif parse_allbeds(line, user, tbparams, master, export_project,
1630                        access_user):
1631                    continue
1632                elif parse_gateways(line, allocated, tbparams):
1633                    continue
1634                elif parse_vtopo(line):
1635                    continue
1636                elif parse_hostnames(line):
1637                    continue
1638                elif parse_tarfiles(line):
1639                    continue
1640                elif parse_rpms(line):
1641                    continue
1642                else:
1643                    raise service_error(service_error.internal, 
1644                            "Bad tcl parse? %s" % line)
1645            # Virtual topology and visualization
1646            vtopo = self.gentopo(parse_vtopo.str)
1647            if not vtopo:
1648                raise service_error(service_error.internal, 
1649                        "Failed to generate virtual topology")
1650
1651            vis = self.genviz(vtopo)
1652            if not vis:
1653                raise service_error(service_error.internal, 
1654                        "Failed to generate visualization")
1655           
1656            # save federant information
1657            for k in allocated.keys():
1658                tbparams[k]['federant'] = {\
1659                        'name': [ { 'localname' : eid} ],\
1660                        'emulab': tbparams[k]['emulab'],\
1661                        'allocID' : tbparams[k]['allocID'],\
1662                        'master' : k == master,\
1663                    }
1664
1665
1666            # Copy tarfiles and rpms needed at remote sites into a staging area
1667            try:
1668                if self.fedkit:
1669                    parse_tarfiles.list.append(self.fedkit)
1670                for t in parse_tarfiles.list:
1671                    if not os.path.exists("%s/tarfiles" % tmpdir):
1672                        os.mkdir("%s/tarfiles" % tmpdir)
1673                    self.copy_file(t, "%s/tarfiles/%s" % \
1674                            (tmpdir, os.path.basename(t)))
1675                for r in parse_rpms.list:
1676                    if not os.path.exists("%s/rpms" % tmpdir):
1677                        os.mkdir("%s/rpms" % tmpdir)
1678                    self.copy_file(r, "%s/rpms/%s" % \
1679                            (tmpdir, os.path.basename(r)))
1680            except IOError, e:
1681                raise service_error(service_error.internal, 
1682                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1683
1684        except service_error, e:
1685            # If something goes wrong in the parse (usually an access error)
1686            # clear the placeholder state.  From here on out the code delays
1687            # exceptions.
1688            self.state_lock.acquire()
1689            del self.state[eid]
1690            self.state_lock.release()
1691            raise e
1692
1693        thread_pool_info = self.thread_pool()
1694        threads = [ ]
1695
1696        for tb in [ k for k in allocated.keys() if k != master]:
1697            # Wait until we have a free slot to start the next testbed load
1698            thread_pool_info.acquire()
1699            while thread_pool_info.started - \
1700                    thread_pool_info.terminated >= self.nthreads:
1701                thread_pool_info.wait()
1702            thread_pool_info.release()
1703
1704            # Create and start a thread to start the segment, and save it to
1705            # get the return value later
1706            t  = self.pooled_thread(target=self.start_segment, 
1707                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1708                    pdata=thread_pool_info, trace_file=self.trace_file)
1709            threads.append(t)
1710            t.start()
1711
1712        # Wait until all finish (the first clause of the while is to make sure
1713        # one starts)
1714        thread_pool_info.acquire()
1715        while thread_pool_info.started == 0 or \
1716                thread_pool_info.started > thread_pool_info.terminated:
1717            thread_pool_info.wait()
1718        thread_pool_info.release()
1719
1720        # If none failed, start the master
1721        failed = [ t.getName() for t in threads if not t.rv ]
1722
1723        if len(failed) == 0:
1724            if not self.start_segment(master, eid, tbparams, tmpdir):
1725                failed.append(master)
1726
1727        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1728        # If one failed clean up, unless fail_soft is set
1729        if failed:
1730            if not fail_soft:
1731                for tb in succeeded:
1732                    self.stop_segment(tb, eid, tbparams)
1733                # release the allocations
1734                for tb in tbparams.keys():
1735                    self.release_access(tb, tbparams[tb]['allocID'])
1736                # Remove the placeholder
1737                self.state_lock.acquire()
1738                del self.state[eid]
1739                self.state_lock.release()
1740
1741                raise service_error(service_error.federant,
1742                    "Swap in failed on %s" % ",".join(failed))
1743        else:
1744            self.log.info("[start_segment]: Experiment %s started" % eid)
1745
1746        # Generate an ID for the experiment (slice) and a certificate that the
1747        # allocator can use to prove they own it.  We'll ship it back through
1748        # the encrypted connection.
1749        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1750
1751        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1752
1753        # Walk up tmpdir, deleting as we go
1754        for path, dirs, files in os.walk(tmpdir, topdown=False):
1755            for f in files:
1756                os.remove(os.path.join(path, f))
1757            for d in dirs:
1758                os.rmdir(os.path.join(path, d))
1759        os.rmdir(tmpdir)
1760
1761        # The deepcopy prevents the allocation ID and other binaries from being
1762        # translated into other formats
1763        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1764                for tb in tbparams.keys() \
1765                    if tbparams[tb].has_key('federant') ],\
1766                    'vtopo': vtopo,\
1767                    'vis' : vis,
1768                    'experimentID' : [\
1769                            { 'fedid': copy.copy(expid) }, \
1770                            { 'localname': eid },\
1771                        ],\
1772                    'experimentAccess': { 'X509' : expcert },\
1773                }
1774
1775        # Insert the experiment into our state and update the disk copy
1776        self.state_lock.acquire()
1777        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1778                for tb in tbparams.keys() \
1779                    if tbparams[tb].has_key('federant') ],\
1780                    'vtopo': vtopo,\
1781                    'vis' : vis,
1782                    'owner': fid,
1783                    'experimentID' : [\
1784                            { 'fedid': expid }, { 'localname': eid },\
1785                        ],\
1786                }
1787        self.state[eid] = self.state[expid]
1788        if self.state_filename: self.write_state()
1789        self.state_lock.release()
1790
1791        self.auth.set_attribute(fid, expid)
1792        self.auth.set_attribute(expid, expid)
1793
1794        if not failed:
1795            return resp
1796        else:
1797            raise service_error(service_error.partial, \
1798                    "Partial swap in on %s" % ",".join(succeeded))
1799
1800    def check_experiment_access(self, fid, key):
1801        """
1802        Confirm that the fid has access to the experiment.  Though a request
1803        may be made in terms of a local name, the access attribute is always
1804        the experiment's fedid.
1805        """
1806        if not isinstance(key, fedid):
1807            self.state_lock.acquire()
1808            if self.state.has_key(key):
1809                try:
1810                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1811                            if f.has_key('fedid') ]
1812                except KeyError:
1813                    self.state_lock.release()
1814                    raise service_error(service_error.internal, 
1815                            "No fedid for experiment %s when checking " +\
1816                                    "access(!?)" % key)
1817                if len(kl) == 1:
1818                    key = kl[0]
1819                else:
1820                    self.state_lock.release()
1821                    raise service_error(service_error.internal, 
1822                            "multiple fedids for experiment %s when " +\
1823                                    "checking access(!?)" % key)
1824            else:
1825                self.state_lock.release()
1826                raise service_error(service_error.access, "Access Denied")
1827            self.state_lock.release()
1828
1829        if self.auth.check_attribute(fid, key):
1830            return True
1831        else:
1832            raise service_error(service_error.access, "Access Denied")
1833
1834
1835
1836    def get_vtopo(self, req, fid):
1837        """
1838        Return the stored virtual topology for this experiment
1839        """
1840        rv = None
1841
1842        req = req.get('VtopoRequestBody', None)
1843        if not req:
1844            raise service_error(service_error.req,
1845                    "Bad request format (no VtopoRequestBody)")
1846        exp = req.get('experiment', None)
1847        if exp:
1848            if exp.has_key('fedid'):
1849                key = exp['fedid']
1850                keytype = "fedid"
1851            elif exp.has_key('localname'):
1852                key = exp['localname']
1853                keytype = "localname"
1854            else:
1855                raise service_error(service_error.req, "Unknown lookup type")
1856        else:
1857            raise service_error(service_error.req, "No request?")
1858
1859        self.check_experiment_access(fid, key)
1860
1861        self.state_lock.acquire()
1862        if self.state.has_key(key):
1863            rv = { 'experiment' : {keytype: key },\
1864                    'vtopo': self.state[key]['vtopo'],\
1865                }
1866        self.state_lock.release()
1867
1868        if rv: return rv
1869        else: raise service_error(service_error.req, "No such experiment")
1870
1871    def get_vis(self, req, fid):
1872        """
1873        Return the stored visualization for this experiment
1874        """
1875        rv = None
1876
1877        req = req.get('VisRequestBody', None)
1878        if not req:
1879            raise service_error(service_error.req,
1880                    "Bad request format (no VisRequestBody)")
1881        exp = req.get('experiment', None)
1882        if exp:
1883            if exp.has_key('fedid'):
1884                key = exp['fedid']
1885                keytype = "fedid"
1886            elif exp.has_key('localname'):
1887                key = exp['localname']
1888                keytype = "localname"
1889            else:
1890                raise service_error(service_error.req, "Unknown lookup type")
1891        else:
1892            raise service_error(service_error.req, "No request?")
1893
1894        self.check_experiment_access(fid, key)
1895
1896        self.state_lock.acquire()
1897        if self.state.has_key(key):
1898            rv =  { 'experiment' : {keytype: key },\
1899                    'vis': self.state[key]['vis'],\
1900                    }
1901        self.state_lock.release()
1902
1903        if rv: return rv
1904        else: raise service_error(service_error.req, "No such experiment")
1905
1906    def get_info(self, req, fid):
1907        """
1908        Return all the stored info about this experiment
1909        """
1910        rv = None
1911
1912        req = req.get('InfoRequestBody', None)
1913        if not req:
1914            raise service_error(service_error.req,
1915                    "Bad request format (no VisRequestBody)")
1916        exp = req.get('experiment', None)
1917        if exp:
1918            if exp.has_key('fedid'):
1919                key = exp['fedid']
1920                keytype = "fedid"
1921            elif exp.has_key('localname'):
1922                key = exp['localname']
1923                keytype = "localname"
1924            else:
1925                raise service_error(service_error.req, "Unknown lookup type")
1926        else:
1927            raise service_error(service_error.req, "No request?")
1928
1929        self.check_experiment_access(fid, key)
1930
1931        # The state may be massaged by the service function that called
1932        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1933        # state.
1934        self.state_lock.acquire()
1935        if self.state.has_key(key):
1936            rv = copy.deepcopy(self.state[key])
1937        self.state_lock.release()
1938
1939        if rv: return rv
1940        else: raise service_error(service_error.req, "No such experiment")
1941
1942
1943    def terminate_experiment(self, req, fid):
1944        """
1945        Swap this experiment out on the federants and delete the shared
1946        information
1947        """
1948        tbparams = { }
1949        req = req.get('TerminateRequestBody', None)
1950        if not req:
1951            raise service_error(service_error.req,
1952                    "Bad request format (no TerminateRequestBody)")
1953        exp = req.get('experiment', None)
1954        if exp:
1955            if exp.has_key('fedid'):
1956                key = exp['fedid']
1957                keytype = "fedid"
1958            elif exp.has_key('localname'):
1959                key = exp['localname']
1960                keytype = "localname"
1961            else:
1962                raise service_error(service_error.req, "Unknown lookup type")
1963        else:
1964            raise service_error(service_error.req, "No request?")
1965
1966        self.check_experiment_access(fid, key)
1967
1968        self.state_lock.acquire()
1969        fed_exp = self.state.get(key, None)
1970
1971        if fed_exp:
1972            # This branch of the conditional holds the lock to generate a
1973            # consistent temporary tbparams variable to deallocate experiments.
1974            # It releases the lock to do the deallocations and reacquires it to
1975            # remove the experiment state when the termination is complete.
1976            ids = []
1977            #  experimentID is a list of dicts that are self-describing
1978            #  identifiers.  This finds all the fedids and localnames - the
1979            #  keys of self.state - and puts them into ids.
1980            for id in fed_exp.get('experimentID', []):
1981                if id.has_key('fedid'): ids.append(id['fedid'])
1982                if id.has_key('localname'): ids.append(id['localname'])
1983
1984            # Construct enough of the tbparams to make the stop_segment calls
1985            # work
1986            for fed in fed_exp['federant']:
1987                try:
1988                    for e in fed['name']:
1989                        eid = e.get('localname', None)
1990                        if eid: break
1991                    else:
1992                        continue
1993
1994                    p = fed['emulab']['project']
1995
1996                    project = p['name']['localname']
1997                    tb = p['testbed']['localname']
1998                    user = p['user'][0]['userID']['localname']
1999
2000                    domain = fed['emulab']['domain']
2001                    host  = fed['emulab']['ops']
2002                    aid = fed['allocID']
2003                except KeyError, e:
2004                    continue
2005                tbparams[tb] = {\
2006                        'user': user,\
2007                        'domain': domain,\
2008                        'project': project,\
2009                        'host': host,\
2010                        'eid': eid,\
2011                        'aid': aid,\
2012                    }
2013            self.state_lock.release()
2014
2015            # Stop everyone.
2016            for tb in tbparams.keys():
2017                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
2018
2019            # release the allocations
2020            for tb in tbparams.keys():
2021                self.release_access(tb, tbparams[tb]['aid'])
2022
2023            # Remove the terminated experiment
2024            self.state_lock.acquire()
2025            for id in ids:
2026                if self.state.has_key(id): del self.state[id]
2027
2028            if self.state_filename: self.write_state()
2029            self.state_lock.release()
2030
2031            return { 'experiment': exp }
2032        else:
2033            # Don't forget to release the lock
2034            self.state_lock.release()
2035            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.