source: fedd/federation/experiment_control.py @ 338e9a7

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 338e9a7 was 3bce099, checked in by Ted Faber <faber@…>, 16 years ago

properly identify experiment creator

  • Property mode set to 100644
File size: 61.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53
54        def acquire(self):
55            """
56            Get the pool's lock.
57            """
58            self.changed.acquire()
59
60        def release(self):
61            """
62            Release the pool's lock.
63            """
64            self.changed.release()
65
66        def wait(self, timeout = None):
67            """
68            Wait for a pool thread to start or stop.
69            """
70            self.changed.wait(timeout)
71
72        def start(self):
73            """
74            Called by a pool thread to report starting.
75            """
76            self.changed.acquire()
77            self.started += 1
78            self.changed.notifyAll()
79            self.changed.release()
80
81        def terminate(self):
82            """
83            Called by a pool thread to report finishing.
84            """
85            self.changed.acquire()
86            self.terminated += 1
87            self.changed.notifyAll()
88            self.changed.release()
89
90        def clear(self):
91            """
92            Clear all pool data.
93            """
94            self.changed.acquire()
95            self.started = 0
96            self.terminated =0
97            self.changed.notifyAll()
98            self.changed.release()
99
100    class pooled_thread(Thread):
101        """
102        One of a set of threads dedicated to a specific task.  Uses the
103        thread_pool class above for coordination.
104        """
105        def __init__(self, group=None, target=None, name=None, args=(), 
106                kwargs={}, pdata=None, trace_file=None):
107            Thread.__init__(self, group, target, name, args, kwargs)
108            self.rv = None          # Return value of the ops in this thread
109            self.exception = None   # Exception that terminated this thread
110            self.target=target      # Target function to run on start()
111            self.args = args        # Args to pass to target
112            self.kwargs = kwargs    # Additional kw args
113            self.pdata = pdata      # thread_pool for this class
114            # Logger for this thread
115            self.log = logging.getLogger("fedd.experiment_control")
116       
117        def run(self):
118            """
119            Emulate Thread.run, except add pool data manipulation and error
120            logging.
121            """
122            if self.pdata:
123                self.pdata.start()
124
125            if self.target:
126                try:
127                    self.rv = self.target(*self.args, **self.kwargs)
128                except service_error, s:
129                    self.exception = s
130                    self.log.error("Thread exception: %s %s" % \
131                            (s.code_string(), s.desc))
132                except:
133                    self.exception = sys.exc_info()[1]
134                    self.log.error(("Unexpected thread exception: %s" +\
135                            "Trace %s") % (self.exception,\
136                                traceback.format_exc()))
137            if self.pdata:
138                self.pdata.terminate()
139
140    call_RequestAccess = service_caller('RequestAccess')
141    call_ReleaseAccess = service_caller('ReleaseAccess')
142    call_Ns2Split = service_caller('Ns2Split')
143
144    def __init__(self, config=None, auth=None):
145        """
146        Intialize the various attributes, most from the config object
147        """
148        self.thread_with_rv = experiment_control_local.pooled_thread
149        self.thread_pool = experiment_control_local.thread_pool
150
151        self.cert_file = config.get("experiment_control", "cert_file")
152        if self.cert_file:
153            self.cert_pwd = config.get("experiment_control", "cert_pwd")
154        else:
155            self.cert_file = config.get("globals", "cert_file")
156            self.cert_pwd = config.get("globals", "cert_pwd")
157
158        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
159                or config.get("globals", "trusted_certs")
160
161        self.exp_stem = "fed-stem"
162        self.log = logging.getLogger("fedd.experiment_control")
163        set_log_level(config, "experiment_control", self.log)
164        self.muxmax = 2
165        self.nthreads = 2
166        self.randomize_experiments = False
167
168        self.scp_exec = "/usr/bin/scp"
169        self.splitter = None
170        self.ssh_exec="/usr/bin/ssh"
171        self.ssh_keygen = "/usr/bin/ssh-keygen"
172        self.ssh_identity_file = None
173
174
175        self.debug = config.getboolean("experiment_control", "create_debug")
176        self.state_filename = config.get("experiment_control", 
177                "experiment_state_file")
178        self.splitter_url = config.get("experiment_control", "splitter_url")
179        self.fedkit = config.get("experiment_control", "fedkit")
180        accessdb_file = config.get("experiment_control", "accessdb")
181
182        self.ssh_pubkey_file = config.get("experiment_control", 
183                "ssh_pubkey_file")
184        self.ssh_privkey_file = config.get("experiment_control",
185                "ssh_privkey_file")
186        # NB for internal master/slave ops, not experiment setup
187        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
188        self.state = { }
189        self.state_lock = Lock()
190        self.tclsh = "/usr/local/bin/otclsh"
191        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
192                config.get("experiment_control", "tcl_splitter",
193                        "/usr/testbed/lib/ns2ir/parse.tcl")
194        mapdb_file = config.get("experiment_control", "mapdb")
195        self.trace_file = sys.stderr
196
197        self.def_expstart = \
198                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
199                "/tmp/federate";
200        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
201                "FEDDIR/hosts";
202        self.def_gwstart = \
203                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
204                "/tmp/bridge.log";
205        self.def_mgwstart = \
206                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
207                "/tmp/bridge.log";
208        self.def_gwimage = "FBSD61-TUNNEL2";
209        self.def_gwtype = "pc";
210        self.local_access = { }
211
212        if auth:
213            self.auth = auth
214        else:
215            self.log.error(\
216                    "[access]: No authorizer initialized, creating local one.")
217            auth = authorizer()
218
219
220        if self.ssh_pubkey_file:
221            try:
222                f = open(self.ssh_pubkey_file, 'r')
223                self.ssh_pubkey = f.read()
224                f.close()
225            except IOError:
226                raise service_error(service_error.internal,
227                        "Cannot read sshpubkey")
228        else:
229            raise service_error(service_error.internal, 
230                    "No SSH public key file?")
231
232        if not self.ssh_privkey_file:
233            raise service_error(service_error.internal, 
234                    "No SSH public key file?")
235
236
237        if mapdb_file:
238            self.read_mapdb(mapdb_file)
239        else:
240            self.log.warn("[experiment_control] No testbed map, using defaults")
241            self.tbmap = { 
242                    'deter':'https://users.isi.deterlab.net:23235',
243                    'emulab':'https://users.isi.deterlab.net:23236',
244                    'ucb':'https://users.isi.deterlab.net:23237',
245                    }
246
247        if accessdb_file:
248                self.read_accessdb(accessdb_file)
249        else:
250            raise service_error(service_error.internal,
251                    "No accessdb specified in config")
252
253        # Grab saved state.  OK to do this w/o locking because it's read only
254        # and only one thread should be in existence that can see self.state at
255        # this point.
256        if self.state_filename:
257            self.read_state()
258
259        # Dispatch tables
260        self.soap_services = {\
261                'Create': soap_handler('Create', self.create_experiment),
262                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
263                'Vis': soap_handler('Vis', self.get_vis),
264                'Info': soap_handler('Info', self.get_info),
265                'Terminate': soap_handler('Terminate', 
266                    self.terminate_experiment),
267        }
268
269        self.xmlrpc_services = {\
270                'Create': xmlrpc_handler('Create', self.create_experiment),
271                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
272                'Vis': xmlrpc_handler('Vis', self.get_vis),
273                'Info': xmlrpc_handler('Info', self.get_info),
274                'Terminate': xmlrpc_handler('Terminate',
275                    self.terminate_experiment),
276        }
277
278    def copy_file(self, src, dest, size=1024):
279        """
280        Exceedingly simple file copy.
281        """
282        s = open(src,'r')
283        d = open(dest, 'w')
284
285        buf = "x"
286        while buf != "":
287            buf = s.read(size)
288            d.write(buf)
289        s.close()
290        d.close()
291
292    # Call while holding self.state_lock
293    def write_state(self):
294        """
295        Write a new copy of experiment state after copying the existing state
296        to a backup.
297
298        State format is a simple pickling of the state dictionary.
299        """
300        if os.access(self.state_filename, os.W_OK):
301            self.copy_file(self.state_filename, \
302                    "%s.bak" % self.state_filename)
303        try:
304            f = open(self.state_filename, 'w')
305            pickle.dump(self.state, f)
306        except IOError, e:
307            self.log.error("Can't write file %s: %s" % \
308                    (self.state_filename, e))
309        except pickle.PicklingError, e:
310            self.log.error("Pickling problem: %s" % e)
311        except TypeError, e:
312            self.log.error("Pickling problem (TypeError): %s" % e)
313
314    # Call while holding self.state_lock
315    def read_state(self):
316        """
317        Read a new copy of experiment state.  Old state is overwritten.
318
319        State format is a simple pickling of the state dictionary.
320        """
321        try:
322            f = open(self.state_filename, "r")
323            self.state = pickle.load(f)
324            self.log.debug("[read_state]: Read state from %s" % \
325                    self.state_filename)
326        except IOError, e:
327            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
328                    % (self.state_filename, e))
329        except pickle.UnpicklingError, e:
330            self.log.warning(("[read_state]: No saved state: " + \
331                    "Unpickling failed: %s") % e)
332       
333        for k in self.state.keys():
334            try:
335                # This list should only have one element in it, but phrasing it
336                # as a for loop doesn't cost much, really.  We have to find the
337                # fedid elements anyway.
338                for eid in [ f['fedid'] \
339                        for f in self.state[k]['experimentID']\
340                            if f.has_key('fedid') ]:
341                    self.auth.set_attribute(self.state[k]['owner'], eid)
342            except KeyError, e:
343                self.log.warning("[read_state]: State ownership or identity " +\
344                        "misformatted in %s: %s" % (self.state_filename, e))
345
346
347    def read_accessdb(self, accessdb_file):
348        """
349        Read the mapping from fedids that can create experiments to their name
350        in the 3-level access namespace.  All will be asserted from this
351        testbed and can include the local username and porject that will be
352        asserted on their behalf by this fedd.  Each fedid is also added to the
353        authorization system with the "create" attribute.
354        """
355        self.accessdb = {}
356        # These are the regexps for parsing the db
357        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
358        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
359                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
360        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
361                "\s*->\s*(" + name_expr + ")\s*$")
362        lineno = 0
363
364        # Parse the mappings and store in self.authdb, a dict of
365        # fedid -> (proj, user)
366        try:
367            f = open(accessdb_file, "r")
368            for line in f:
369                lineno += 1
370                line = line.strip()
371                if len(line) == 0 or line.startswith('#'):
372                    continue
373                m = project_line.match(line)
374                if m:
375                    fid = fedid(hexstr=m.group(1))
376                    project, user = m.group(2,3)
377                    if not self.accessdb.has_key(fid):
378                        self.accessdb[fid] = []
379                    self.accessdb[fid].append((project, user))
380                    continue
381
382                m = user_line.match(line)
383                if m:
384                    fid = fedid(hexstr=m.group(1))
385                    project = None
386                    user = m.group(2)
387                    if not self.accessdb.has_key(fid):
388                        self.accessdb[fid] = []
389                    self.accessdb[fid].append((project, user))
390                    continue
391                self.log.warn("[experiment_control] Error parsing access " +\
392                        "db %s at line %d" %  (accessdb_file, lineno))
393        except IOError:
394            raise service_error(service_error.internal,
395                    "Error opening/reading %s as experiment " +\
396                            "control accessdb" %  accessdb_file)
397        f.close()
398
399        # Initialize the authorization attributes
400        for fid in self.accessdb.keys():
401            self.auth.set_attribute(fid, 'create')
402
403    def read_mapdb(self, file):
404        """
405        Read a simple colon separated list of mappings for the
406        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
407        """
408
409        self.tbmap = { }
410        lineno =0
411        try:
412            f = open(file, "r")
413            for line in f:
414                lineno += 1
415                line = line.strip()
416                if line.startswith('#') or len(line) == 0:
417                    continue
418                try:
419                    label, url = line.split(':', 1)
420                    self.tbmap[label] = url
421                except ValueError, e:
422                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
423                            "map db: %s %s" % (lineno, line, e))
424        except IOError, e:
425            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
426                    "open %s: %s" % (file, e))
427        f.close()
428
429    def scp_file(self, file, user, host, dest=""):
430        """
431        scp a file to the remote host.  If debug is set the action is only
432        logged.
433        """
434
435        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', '-i', 
436                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
437        rv = 0
438
439        try:
440            dnull = open("/dev/null", "w")
441        except IOError:
442            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
443            dnull = Null
444
445        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
446        if not self.debug:
447            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
448
449        return rv == 0
450
451    def ssh_cmd(self, user, host, cmd, wname=None):
452        """
453        Run a remote command on host as user.  If debug is set, the action is
454        only logged.
455        """
456        sh_str = "%s -o 'IdentitiesOnly yes' -i %s %s@%s %s" % \
457                (self.ssh_exec, self.ssh_privkey_file, 
458                        user, host, cmd)
459
460        try:
461            dnull = open("/dev/null", "r")
462        except IOError:
463            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
464            dnull = Null
465
466        self.log.debug("[ssh_cmd]: %s" % sh_str)
467        if not self.debug:
468            if dnull:
469                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
470            else:
471                sub = Popen(sh_str, shell=True)
472            return sub.wait() == 0
473        else:
474            return True
475
476    def ship_configs(self, host, user, src_dir, dest_dir):
477        """
478        Copy federant-specific configuration files to the federant.
479        """
480        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
481            return False
482        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
483            return False
484
485        for f in os.listdir(src_dir):
486            if os.path.isdir(f):
487                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
488                        "%s/%s" % (dest_dir, f)):
489                    return False
490            else:
491                if not self.scp_file("%s/%s" % (src_dir, f), 
492                        user, host, dest_dir):
493                    return False
494        return True
495
496    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
497        """
498        Start a sub-experiment on a federant.
499
500        Get the current state, modify or create as appropriate, ship data and
501        configs and start the experiment.  There are small ordering differences
502        based on the initial state of the sub-experiment.
503        """
504        # ops node in the federant
505        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
506        user = tbparams[tb]['user']     # federant user
507        pid = tbparams[tb]['project']   # federant project
508        # XXX
509        base_confs = ( "hosts",)
510        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
511        # command to test experiment state
512        expinfo_exec = "/usr/testbed/bin/expinfo" 
513        # Configuration directories on the remote machine
514        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
515        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
516        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
517        # Regular expressions to parse the expinfo response
518        state_re = re.compile("State:\s+(\w+)")
519        no_exp_re = re.compile("^No\s+such\s+experiment")
520        state = None    # Experiment state parsed from expinfo
521        # The expinfo ssh command.  Note the identity restriction to use only
522        # the identity provided in the pubkey given.
523        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-i', 
524                self.ssh_privkey_file, "%s@%s" % (user, host), 
525                expinfo_exec, pid, eid]
526
527        # Get status
528        self.log.debug("[start_segment]: %s"% " ".join(cmd))
529        dev_null = None
530        try:
531            dev_null = open("/dev/null", "a")
532        except IOError, e:
533            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
534
535        if self.debug:
536            state = 'swapped'
537            rv = 0
538        else:
539            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
540            for line in status.stdout:
541                m = state_re.match(line)
542                if m: state = m.group(1)
543                else:
544                    m = no_exp_re.match(line)
545                    if m: state = "none"
546            rv = status.wait()
547
548        # If the experiment is not present the subcommand returns a non-zero
549        # return value.  If we successfully parsed a "none" outcome, ignore the
550        # return code.
551        if rv != 0 and state != "none":
552            raise service_error(service_error.internal,
553                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
554
555        self.log.debug("[start_segment]: %s: %s" % (tb, state))
556        self.log.info("[start_segment]:transferring experiment to %s" % tb)
557
558        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
559            return False
560        # Clear the federation config dirs
561        if not self.ssh_cmd(user, host, 
562                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
563            return False
564        # Clear and create the tarfiles and rpm directories
565        for d in (tarfiles_dir, rpms_dir):
566            if not self.ssh_cmd(user, host, 
567                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
568                return False
569            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
570                    "create tarfiles"):
571                return False
572       
573        if state == 'active':
574            # Create the federation config dirs (do not move outside the
575            # conditional.  Happens later in new expriment creation)
576            if not self.ssh_cmd(user, host, 
577                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
578                return False
579            # Remote experiment is active.  Modify it.
580            for f in base_confs:
581                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
582                        "%s/%s" % (proj_dir, f)):
583                    return False
584            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
585                    proj_dir):
586                return False
587            if os.path.isdir("%s/tarfiles" % tmpdir):
588                if not self.ship_configs(host, user,
589                        "%s/tarfiles" % tmpdir, tarfiles_dir):
590                    return False
591            if os.path.isdir("%s/rpms" % tmpdir):
592                if not self.ship_configs(host, user,
593                        "%s/rpms" % tmpdir, tarfiles_dir):
594                    return False
595            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
596            if not self.ssh_cmd(user, host,
597                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
598                            (pid, eid, tclfile), "modexp"):
599                return False
600            return True
601        elif state == "swapped":
602            # Create the federation config dirs (do not move outside the
603            # conditional.  Happens later in new expriment creation)
604            if not self.ssh_cmd(user, host, 
605                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
606                return False
607            # Remote experiment swapped out.  Modify it and swap it in.
608            for f in base_confs:
609                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
610                        "%s/%s" % (proj_dir, f)):
611                    return False
612            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
613                    proj_dir):
614                return False
615            if os.path.isdir("%s/tarfiles" % tmpdir):
616                if not self.ship_configs(host, user,
617                        "%s/tarfiles" % tmpdir, tarfiles_dir):
618                    return False
619            if os.path.isdir("%s/rpms" % tmpdir):
620                if not self.ship_configs(host, user,
621                        "%s/rpms" % tmpdir, tarfiles_dir):
622                    return False
623            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
624            if not self.ssh_cmd(user, host,
625                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
626                    "modexp"):
627                return False
628            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
629            if not self.ssh_cmd(user, host,
630                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
631                    "swapexp"):
632                return False
633            return True
634        elif state == "none":
635            # No remote experiment.  Create one.  We do this in 2 steps so we
636            # can put the configuration files and scripts into the new
637            # experiment directories.
638
639            # Tarfiles must be present for creation to work
640            if os.path.isdir("%s/tarfiles" % tmpdir):
641                if not self.ship_configs(host, user,
642                        "%s/tarfiles" % tmpdir, tarfiles_dir):
643                    return False
644            if os.path.isdir("%s/rpms" % tmpdir):
645                if not self.ship_configs(host, user,
646                        "%s/rpms" % tmpdir, tarfiles_dir):
647                    return False
648            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
649            if not self.ssh_cmd(user, host,
650                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
651                            (pid, eid, tclfile), "startexp"):
652                return False
653            # Create the federation config dirs (do not move outside the
654            # conditional.)
655            if not self.ssh_cmd(user, host, 
656                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
657                return False
658            # After startexp the per-experiment directories exist
659            for f in base_confs:
660                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
661                        "%s/%s" % (proj_dir, f)):
662                    return False
663            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
664                    proj_dir):
665                return False
666            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
667            if not self.ssh_cmd(user, host,
668                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
669                    "swapexp"):
670                return False
671            return True
672        else:
673            self.log.debug("[start_segment]:unknown state %s" % state)
674            return False
675
676    def stop_segment(self, tb, eid, tbparams):
677        """
678        Stop a sub experiment by calling swapexp on the federant
679        """
680        user = tbparams[tb]['user']
681        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
682        pid = tbparams[tb]['project']
683
684        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
685        return self.ssh_cmd(user, host,
686                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
687
688       
689    def generate_ssh_keys(self, dest, type="rsa" ):
690        """
691        Generate a set of keys for the gateways to use to talk.
692
693        Keys are of type type and are stored in the required dest file.
694        """
695        valid_types = ("rsa", "dsa")
696        t = type.lower();
697        if t not in valid_types: raise ValueError
698        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
699
700        try:
701            trace = open("/dev/null", "w")
702        except IOError:
703            raise service_error(service_error.internal,
704                    "Cannot open /dev/null??");
705
706        # May raise CalledProcessError
707        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
708        rv = call(cmd, stdout=trace, stderr=trace)
709        if rv != 0:
710            raise service_error(service_error.internal, 
711                    "Cannot generate nonce ssh keys.  %s return code %d" \
712                            % (self.ssh_keygen, rv))
713
714    def gentopo(self, str):
715        """
716        Generate the topology dtat structure from the splitter's XML
717        representation of it.
718
719        The topology XML looks like:
720            <experiment>
721                <nodes>
722                    <node><vname></vname><ips>ip1:ip2</ips></node>
723                </nodes>
724                <lans>
725                    <lan>
726                        <vname></vname><vnode></vnode><ip></ip>
727                        <bandwidth></bandwidth><member>node:port</member>
728                    </lan>
729                </lans>
730        """
731        class topo_parse:
732            """
733            Parse the topology XML and create the dats structure.
734            """
735            def __init__(self):
736                # Typing of the subelements for data conversion
737                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
738                self.int_subelements = ( 'bandwidth',)
739                self.float_subelements = ( 'delay',)
740                # The final data structure
741                self.nodes = [ ]
742                self.lans =  [ ]
743                self.topo = { \
744                        'node': self.nodes,\
745                        'lan' : self.lans,\
746                    }
747                self.element = { }  # Current element being created
748                self.chars = ""     # Last text seen
749
750            def end_element(self, name):
751                # After each sub element the contents is added to the current
752                # element or to the appropriate list.
753                if name == 'node':
754                    self.nodes.append(self.element)
755                    self.element = { }
756                elif name == 'lan':
757                    self.lans.append(self.element)
758                    self.element = { }
759                elif name in self.str_subelements:
760                    self.element[name] = self.chars
761                    self.chars = ""
762                elif name in self.int_subelements:
763                    self.element[name] = int(self.chars)
764                    self.chars = ""
765                elif name in self.float_subelements:
766                    self.element[name] = float(self.chars)
767                    self.chars = ""
768
769            def found_chars(self, data):
770                self.chars += data.rstrip()
771
772
773        tp = topo_parse();
774        parser = xml.parsers.expat.ParserCreate()
775        parser.EndElementHandler = tp.end_element
776        parser.CharacterDataHandler = tp.found_chars
777
778        parser.Parse(str)
779
780        return tp.topo
781       
782
783    def genviz(self, topo):
784        """
785        Generate the visualization the virtual topology
786        """
787
788        neato = "/usr/local/bin/neato"
789        # These are used to parse neato output and to create the visualization
790        # file.
791        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
792        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
793                "%s</type></node>"
794
795        try:
796            # Node names
797            nodes = [ n['vname'] for n in topo['node'] ]
798            topo_lans = topo['lan']
799        except KeyError:
800            raise service_error(service_error.internal, "Bad topology")
801
802        lans = { }
803        links = { }
804
805        # Walk through the virtual topology, organizing the connections into
806        # 2-node connections (links) and more-than-2-node connections (lans).
807        # When a lan is created, it's added to the list of nodes (there's a
808        # node in the visualization for the lan).
809        for l in topo_lans:
810            if links.has_key(l['vname']):
811                if len(links[l['vname']]) < 2:
812                    links[l['vname']].append(l['vnode'])
813                else:
814                    nodes.append(l['vname'])
815                    lans[l['vname']] = links[l['vname']]
816                    del links[l['vname']]
817                    lans[l['vname']].append(l['vnode'])
818            elif lans.has_key(l['vname']):
819                lans[l['vname']].append(l['vnode'])
820            else:
821                links[l['vname']] = [ l['vnode'] ]
822
823
824        # Open up a temporary file for dot to turn into a visualization
825        try:
826            df, dotname = tempfile.mkstemp()
827            dotfile = os.fdopen(df, 'w')
828        except IOError:
829            raise service_error(service_error.internal,
830                    "Failed to open file in genviz")
831
832        # Generate a dot/neato input file from the links, nodes and lans
833        try:
834            print >>dotfile, "graph G {"
835            for n in nodes:
836                print >>dotfile, '\t"%s"' % n
837            for l in links.keys():
838                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
839            for l in lans.keys():
840                for n in lans[l]:
841                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
842            print >>dotfile, "}"
843            dotfile.close()
844        except TypeError:
845            raise service_error(service_error.internal,
846                    "Single endpoint link in vtopo")
847        except IOError:
848            raise service_error(service_error.internal, "Cannot write dot file")
849
850        # Use dot to create a visualization
851        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
852                '-Gpack=true', dotname], stdout=PIPE)
853
854        # Translate dot to vis format
855        vis_nodes = [ ]
856        vis = { 'node': vis_nodes }
857        for line in dot.stdout:
858            m = vis_re.match(line)
859            if m:
860                vn = m.group(1)
861                vis_node = {'name': vn, \
862                        'x': float(m.group(2)),\
863                        'y' : float(m.group(3)),\
864                    }
865                if vn in links.keys() or vn in lans.keys():
866                    vis_node['type'] = 'lan'
867                else:
868                    vis_node['type'] = 'node'
869                vis_nodes.append(vis_node)
870        rv = dot.wait()
871
872        os.remove(dotname)
873        if rv == 0 : return vis
874        else: return None
875
876    def get_access(self, tb, nodes, user, tbparam, master, export_project,
877            access_user):
878        """
879        Get access to testbed through fedd and set the parameters for that tb
880        """
881
882        # XXX This is needless complexity.  It must vanish.
883        translate_attr = {
884            'slavenodestartcmd': 'expstart',
885            'slaveconnectorstartcmd': 'gwstart',
886            'masternodestartcmd': 'mexpstart',
887            'masterconnectorstartcmd': 'mgwstart',
888            'connectorimage': 'gwimage',
889            'connectortype': 'gwtype',
890            'tunnelcfg': 'tun',
891            'tunnelinterface': 'tunnelinterface',
892            'smbshare': 'smbshare',
893        }
894
895        uri = self.tbmap.get(tb, None)
896        if not uri:
897            raise service_error(serice_error.server_config, 
898                    "Unknown testbed: %s" % tb)
899
900        # currently this lumps all users into one service access group
901        service_keys = [ a for u in user \
902                for a in u.get('access', []) \
903                    if a.has_key('sshPubkey')]
904
905        if len(service_keys) == 0:
906            raise service_error(service_error.req, 
907                    "Must have at least one SSH pubkey for services")
908
909
910        for p, u in access_user:
911            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
912                    "to %s") %  ((p or "None"), u, uri))
913
914            if p:
915                # Request with user and project specified
916                req = {\
917                        'destinationTestbed' : { 'uri' : uri },
918                        'project': { 
919                            'name': {'localname': p},
920                            'user': [ {'userID': { 'localname': u } } ],
921                            },
922                        'user':  user,
923                        'allocID' : { 'localname': 'test' },
924                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
925                        'serviceAccess' : service_keys
926                    }
927            else:
928                # Request with only user specified
929                req = {\
930                        'destinationTestbed' : { 'uri' : uri },
931                        'user':  [ {'userID': { 'localname': u } } ],
932                        'allocID' : { 'localname': 'test' },
933                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
934                        'serviceAccess' : service_keys
935                    }
936
937            if tb == master:
938                # NB, the export_project parameter is a dict that includes
939                # the type
940                req['exportProject'] = export_project
941
942            # node resources if any
943            if nodes != None and len(nodes) > 0:
944                rnodes = [ ]
945                for n in nodes:
946                    rn = { }
947                    image, hw, count = n.split(":")
948                    if image: rn['image'] = [ image ]
949                    if hw: rn['hardware'] = [ hw ]
950                    if count and int(count) >0 : rn['count'] = int(count)
951                    rnodes.append(rn)
952                req['resources']= { }
953                req['resources']['node'] = rnodes
954
955            try:
956                if self.local_access.has_key(uri):
957                    # Local access call
958                    req = { 'RequestAccessRequestBody' : req }
959                    r = self.local_access[uri].RequestAccess(req, 
960                            fedid(file=self.cert_file))
961                    r = { 'RequestAccessResponseBody' : r }
962                else:
963                    r = self.call_RequestAccess(uri, req, 
964                            self.cert_file, self.cert_pwd, self.trusted_certs)
965            except service_error, e:
966                if e.code == service_error.access:
967                    self.log.debug("[get_access] Access denied")
968                    r = None
969                    continue
970                else:
971                    raise e
972
973            if r.has_key('RequestAccessResponseBody'):
974                # Through to here we have a valid response, not a fault.
975                # Access denied is a fault, so something better or worse than
976                # access denied has happened.
977                r = r['RequestAccessResponseBody']
978                self.log.debug("[get_access] Access granted")
979                break
980            else:
981                raise service_error(service_error.protocol,
982                        "Bad proxy response")
983       
984        if not r:
985            raise service_error(service_error.access, 
986                    "Access denied by %s (%s)" % (tb, uri))
987
988        e = r['emulab']
989        p = e['project']
990        tbparam[tb] = { 
991                "boss": e['boss'],
992                "host": e['ops'],
993                "domain": e['domain'],
994                "fs": e['fileServer'],
995                "eventserver": e['eventServer'],
996                "project": unpack_id(p['name']),
997                "emulab" : e,
998                "allocID" : r['allocID'],
999                }
1000        # Make the testbed name be the label the user applied
1001        p['testbed'] = {'localname': tb }
1002
1003        for u in p['user']:
1004            role = u.get('role', None)
1005            if role == 'experimentCreation':
1006                tbparam[tb]['user'] = unpack_id(u['userID'])
1007                break
1008        else:
1009            raise service_error(service_error.internal, 
1010                    "No createExperimentUser from %s" %tb)
1011
1012        for a in e['fedAttr']:
1013            if a['attribute']:
1014                key = translate_attr.get(a['attribute'].lower(), None)
1015                if key:
1016                    tbparam[tb][key]= a['value']
1017       
1018    def release_access(self, tb, aid):
1019        """
1020        Release access to testbed through fedd
1021        """
1022
1023        uri = self.tbmap.get(tb, None)
1024        if not uri:
1025            raise service_error(serice_error.server_config, 
1026                    "Unknown testbed: %s" % tb)
1027
1028        if self.local_access.has_key(uri):
1029            resp = self.local_access[uri].ReleaseAccess(\
1030                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1031                    fedid(file=self.cert_file))
1032            resp = { 'ReleaseAccessResponseBody': resp } 
1033        else:
1034            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1035                    self.cert_file, self.cert_pwd, self.trusted_certs)
1036
1037        # better error coding
1038
1039    def remote_splitter(self, uri, desc, master):
1040
1041        req = {
1042                'description' : { 'ns2description': desc },
1043                'master': master,
1044                'include_fedkit': bool(self.fedkit)
1045            }
1046
1047        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1048                self.trusted_certs)
1049
1050        if r.has_key('Ns2SplitResponseBody'):
1051            r = r['Ns2SplitResponseBody']
1052            if r.has_key('output'):
1053                return r['output'].splitlines()
1054            else:
1055                raise service_error(service_error.protocol, 
1056                        "Bad splitter response (no output)")
1057        else:
1058            raise service_error(service_error.protocol, "Bad splitter response")
1059       
1060    class current_testbed:
1061        """
1062        Object for collecting the current testbed description.  The testbed
1063        description is saved to a file with the local testbed variables
1064        subsittuted line by line.
1065        """
1066        def __init__(self, eid, tmpdir, fedkit):
1067            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1068            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1069            self.current_testbed = None
1070            self.testbed_file = None
1071
1072            self.def_expstart = \
1073                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1074            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1075            self.def_gwstart = \
1076                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1077            self.def_mgwstart = \
1078                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1079            self.def_gwimage = "FBSD61-TUNNEL2";
1080            self.def_gwtype = "pc";
1081
1082            self.eid = eid
1083            self.tmpdir = tmpdir
1084            self.fedkit = fedkit
1085
1086        def __call__(self, line, master, allocated, tbparams):
1087            # Capture testbed topology descriptions
1088            if self.current_testbed == None:
1089                m = self.begin_testbed.match(line)
1090                if m != None:
1091                    self.current_testbed = m.group(1)
1092                    if self.current_testbed == None:
1093                        raise service_error(service_error.req,
1094                                "Bad request format (unnamed testbed)")
1095                    allocated[self.current_testbed] = \
1096                            allocated.get(self.current_testbed,0) + 1
1097                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1098                    if not os.path.exists(tb_dir):
1099                        try:
1100                            os.mkdir(tb_dir)
1101                        except IOError:
1102                            raise service_error(service_error.internal,
1103                                    "Cannot create %s" % tb_dir)
1104                    try:
1105                        self.testbed_file = open("%s/%s.%s.tcl" %
1106                                (tb_dir, self.eid, self.current_testbed), 'w')
1107                    except IOError:
1108                        self.testbed_file = None
1109                    return True
1110                else: return False
1111            else:
1112                m = self.end_testbed.match(line)
1113                if m != None:
1114                    if m.group(1) != self.current_testbed:
1115                        raise service_error(service_error.internal, 
1116                                "Mismatched testbed markers!?")
1117                    if self.testbed_file != None: 
1118                        self.testbed_file.close()
1119                        self.testbed_file = None
1120                    self.current_testbed = None
1121                elif self.testbed_file:
1122                    # Substitute variables and put the line into the local
1123                    # testbed file.
1124                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1125                            self.def_gwtype)
1126                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1127                            self.def_gwimage)
1128                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1129                            self.def_mgwstart)
1130                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1131                            self.def_mexpstart)
1132                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1133                            self.def_gwstart)
1134                    expstart = tbparams[self.current_testbed].get('expstart', 
1135                            self.def_expstart)
1136                    project = tbparams[self.current_testbed].get('project')
1137                    line = re.sub("GWTYPE", gwtype, line)
1138                    line = re.sub("GWIMAGE", gwimage, line)
1139                    if self.current_testbed == master:
1140                        line = re.sub("GWSTART", mgwstart, line)
1141                        line = re.sub("EXPSTART", mexpstart, line)
1142                    else:
1143                        line = re.sub("GWSTART", gwstart, line)
1144                        line = re.sub("EXPSTART", expstart, line)
1145                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1146                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1147                    line = re.sub("EID", self.eid, line)
1148                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1149                            (project, self.eid), line)
1150                    if self.fedkit:
1151                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1152                                line)
1153                    print >>self.testbed_file, line
1154                return True
1155
1156    class allbeds:
1157        """
1158        Process the Allbeds section.  Get access to each federant and save the
1159        parameters in tbparams
1160        """
1161        def __init__(self, get_access):
1162            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1163            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1164            self.in_allbeds = False
1165            self.get_access = get_access
1166
1167        def __call__(self, line, user, tbparams, master, export_project,
1168                access_user):
1169            # Testbed access parameters
1170            if not self.in_allbeds:
1171                if self.begin_allbeds.match(line):
1172                    self.in_allbeds = True
1173                    return True
1174                else:
1175                    return False
1176            else:
1177                if self.end_allbeds.match(line):
1178                    self.in_allbeds = False
1179                else:
1180                    nodes = line.split('|')
1181                    tb = nodes.pop(0)
1182                    self.get_access(tb, nodes, user, tbparams, master,
1183                            export_project, access_user)
1184                return True
1185
1186    class gateways:
1187        def __init__(self, eid, master, tmpdir, gw_pubkey,
1188                gw_secretkey, copy_file, fedkit):
1189            self.begin_gateways = \
1190                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1191            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1192            self.current_gateways = None
1193            self.control_gateway = None
1194            self.active_end = { }
1195
1196            self.eid = eid
1197            self.master = master
1198            self.tmpdir = tmpdir
1199            self.gw_pubkey_base = gw_pubkey
1200            self.gw_secretkey_base = gw_secretkey
1201
1202            self.copy_file = copy_file
1203            self.fedkit = fedkit
1204
1205
1206        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1207                active_end, tbparams, dtb, myname, desthost, type):
1208            """
1209            Produce a gateway configuration file from a gateways line.
1210            """
1211
1212            sproject = tbparams[gw].get('project', 'project')
1213            dproject = tbparams[dtb].get('project', 'project')
1214            sdomain = ".%s.%s%s" % (eid, sproject,
1215                    tbparams[gw].get('domain', ".example.com"))
1216            ddomain = ".%s.%s%s" % (eid, dproject,
1217                    tbparams[dtb].get('domain', ".example.com"))
1218            boss = tbparams[master].get('boss', "boss")
1219            fs = tbparams[master].get('fs', "fs")
1220            event_server = "%s%s" % \
1221                    (tbparams[gw].get('eventserver', "event_server"),
1222                            tbparams[gw].get('domain', "example.com"))
1223            remote_event_server = "%s%s" % \
1224                    (tbparams[dtb].get('eventserver', "event_server"),
1225                            tbparams[dtb].get('domain', "example.com"))
1226            seer_control = "%s%s" % \
1227                    (tbparams[gw].get('control', "control"), sdomain)
1228            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1229
1230            if self.fedkit:
1231                remote_script_dir = "/usr/local/federation/bin"
1232                local_script_dir = "/usr/local/federation/bin"
1233            else:
1234                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1235                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1236
1237            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1238            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1239            tunnel_cfg = tbparams[gw].get("tun", "false")
1240
1241            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1242            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1243
1244            # translate to lower case so the `hostname` hack for specifying
1245            # configuration files works.
1246            conf_file = conf_file.lower();
1247            remote_conf_file = remote_conf_file.lower();
1248
1249            if dtb == master:
1250                active = "false"
1251            elif gw == master:
1252                active = "true"
1253            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1254                active = "false"
1255            else:
1256                active_end['%s-%s' % (gw, dtb)] = 1
1257                active = "true"
1258
1259            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1260            print >>gwconfig, "Active: %s" % active
1261            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1262            if tunnel_iface:
1263                print >>gwconfig, "Interface: %s" % tunnel_iface
1264            print >>gwconfig, "BossName: %s" % boss
1265            print >>gwconfig, "FsName: %s" % fs
1266            print >>gwconfig, "EventServerName: %s" % event_server
1267            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1268            print >>gwconfig, "SeerControl: %s" % seer_control
1269            print >>gwconfig, "Type: %s" % type
1270            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1271            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1272                    local_script_dir
1273            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1274            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1275            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1276                    (remote_conf_dir, remote_conf_file)
1277            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1278            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1279            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1280            gwconfig.close()
1281
1282            return active == "true"
1283
1284        def __call__(self, line, allocated, tbparams):
1285            # Process gateways
1286            if not self.current_gateways:
1287                m = self.begin_gateways.match(line)
1288                if m:
1289                    self.current_gateways = m.group(1)
1290                    if allocated.has_key(self.current_gateways):
1291                        # This test should always succeed
1292                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1293                        if not os.path.exists(tb_dir):
1294                            try:
1295                                os.mkdir(tb_dir)
1296                            except IOError:
1297                                raise service_error(service_error.internal,
1298                                        "Cannot create %s" % tb_dir)
1299                    else:
1300                        # XXX
1301                        self.log.error("[gateways]: Ignoring gateways for " + \
1302                                "unknown testbed %s" % self.current_gateways)
1303                        self.current_gateways = None
1304                    return True
1305                else:
1306                    return False
1307            else:
1308                m = self.end_gateways.match(line)
1309                if m :
1310                    if m.group(1) != self.current_gateways:
1311                        raise service_error(service_error.internal,
1312                                "Mismatched gateway markers!?")
1313                    if self.control_gateway:
1314                        try:
1315                            cc = open("%s/%s/client.conf" %
1316                                    (self.tmpdir, self.current_gateways), 'w')
1317                            print >>cc, "ControlGateway: %s" % \
1318                                    self.control_gateway
1319                            if tbparams[self.master].has_key('smbshare'):
1320                                print >>cc, "SMBSHare: %s" % \
1321                                        tbparams[self.master]['smbshare']
1322                            print >>cc, "ProjectUser: %s" % \
1323                                    tbparams[self.master]['user']
1324                            print >>cc, "ProjectName: %s" % \
1325                                    tbparams[self.master]['project']
1326                            cc.close()
1327                        except IOError:
1328                            raise service_error(service_error.internal,
1329                                    "Error creating client config")
1330                        try:
1331                            cc = open("%s/%s/seer.conf" %
1332                                    (self.tmpdir, self.current_gateways),
1333                                    'w')
1334                            if self.current_gateways != self.master:
1335                                print >>cc, "ControlNode: %s" % \
1336                                        self.control_gateway
1337                            print >>cc, "ExperimentID: %s/%s" % \
1338                                    ( tbparams[self.master]['project'], \
1339                                    self.eid )
1340                            cc.close()
1341                        except IOError:
1342                            raise service_error(service_error.internal,
1343                                    "Error creating seer config")
1344                    else:
1345                        debug.error("[gateways]: No control gateway for %s" %\
1346                                    self.current_gateways)
1347                    self.current_gateways = None
1348                else:
1349                    dtb, myname, desthost, type = line.split(" ")
1350
1351                    if type == "control" or type == "both":
1352                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1353                                self.eid, 
1354                                tbparams[self.current_gateways]['project'],
1355                                tbparams[self.current_gateways]['domain'])
1356                    try:
1357                        active = self.gateway_conf_file(self.current_gateways,
1358                                self.master, self.eid, self.gw_pubkey_base,
1359                                self.gw_secretkey_base,
1360                                self.active_end, tbparams, dtb, myname,
1361                                desthost, type)
1362                    except IOError, e:
1363                        raise service_error(service_error.internal,
1364                                "Failed to write config file for %s" % \
1365                                        self.current_gateway)
1366           
1367                    gw_pubkey = "%s/keys/%s" % \
1368                            (self.tmpdir, self.gw_pubkey_base)
1369                    gw_secretkey = "%s/keys/%s" % \
1370                            (self.tmpdir, self.gw_secretkey_base)
1371
1372                    pkfile = "%s/%s/%s" % \
1373                            ( self.tmpdir, self.current_gateways, 
1374                                    self.gw_pubkey_base)
1375                    skfile = "%s/%s/%s" % \
1376                            ( self.tmpdir, self.current_gateways, 
1377                                    self.gw_secretkey_base)
1378
1379                    if not os.path.exists(pkfile):
1380                        try:
1381                            self.copy_file(gw_pubkey, pkfile)
1382                        except IOError:
1383                            service_error(service_error.internal,
1384                                    "Failed to copy pubkey file")
1385
1386                    if active and not os.path.exists(skfile):
1387                        try:
1388                            self.copy_file(gw_secretkey, skfile)
1389                        except IOError:
1390                            service_error(service_error.internal,
1391                                    "Failed to copy secretkey file")
1392                return True
1393
1394    class shunt_to_file:
1395        """
1396        Simple class to write data between two regexps to a file.
1397        """
1398        def __init__(self, begin, end, filename):
1399            """
1400            Begin shunting on a match of begin, stop on end, send data to
1401            filename.
1402            """
1403            self.begin = re.compile(begin)
1404            self.end = re.compile(end)
1405            self.in_shunt = False
1406            self.file = None
1407            self.filename = filename
1408
1409        def __call__(self, line):
1410            """
1411            Call this on each line in the input that may be shunted.
1412            """
1413            if not self.in_shunt:
1414                if self.begin.match(line):
1415                    self.in_shunt = True
1416                    try:
1417                        self.file = open(self.filename, "w")
1418                    except:
1419                        self.file = None
1420                        raise
1421                    return True
1422                else:
1423                    return False
1424            else:
1425                if self.end.match(line):
1426                    if self.file: 
1427                        self.file.close()
1428                        self.file = None
1429                    self.in_shunt = False
1430                else:
1431                    if self.file:
1432                        print >>self.file, line
1433                return True
1434
1435    class shunt_to_list:
1436        """
1437        Same interface as shunt_to_file.  Data collected in self.list, one list
1438        element per line.
1439        """
1440        def __init__(self, begin, end):
1441            self.begin = re.compile(begin)
1442            self.end = re.compile(end)
1443            self.in_shunt = False
1444            self.list = [ ]
1445       
1446        def __call__(self, line):
1447            if not self.in_shunt:
1448                if self.begin.match(line):
1449                    self.in_shunt = True
1450                    return True
1451                else:
1452                    return False
1453            else:
1454                if self.end.match(line):
1455                    self.in_shunt = False
1456                else:
1457                    self.list.append(line)
1458                return True
1459
1460    class shunt_to_string:
1461        """
1462        Same interface as shunt_to_file.  Data collected in self.str, all in
1463        one string.
1464        """
1465        def __init__(self, begin, end):
1466            self.begin = re.compile(begin)
1467            self.end = re.compile(end)
1468            self.in_shunt = False
1469            self.str = ""
1470       
1471        def __call__(self, line):
1472            if not self.in_shunt:
1473                if self.begin.match(line):
1474                    self.in_shunt = True
1475                    return True
1476                else:
1477                    return False
1478            else:
1479                if self.end.match(line):
1480                    self.in_shunt = False
1481                else:
1482                    self.str += line
1483                return True
1484
1485    def create_experiment(self, req, fid):
1486        """
1487        The external interface to experiment creation called from the
1488        dispatcher.
1489
1490        Creates a working directory, splits the incoming description using the
1491        splitter script and parses out the avrious subsections using the
1492        lcasses above.  Once each sub-experiment is created, use pooled threads
1493        to instantiate them and start it all up.
1494        """
1495
1496        if not self.auth.check_attribute(fid, 'create'):
1497            raise service_error(service_error.access, "Create access denied")
1498
1499        try:
1500            tmpdir = tempfile.mkdtemp(prefix="split-")
1501        except IOError:
1502            raise service_error(service_error.internal, "Cannot create tmp dir")
1503
1504        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1505        gw_secretkey_base = "fed.%s" % self.ssh_type
1506        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1507        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1508        tclfile = tmpdir + "/experiment.tcl"
1509        tbparams = { }
1510        try:
1511            access_user = self.accessdb[fid]
1512        except KeyError:
1513            raise service_error(service_error.internal,
1514                    "Access map and authorizer out of sync in " + \
1515                            "create_experiment for fedid %s"  % fid)
1516
1517        pid = "dummy"
1518        gid = "dummy"
1519        # XXX
1520        fail_soft = False
1521
1522        try:
1523            os.mkdir(tmpdir+"/keys")
1524        except OSError:
1525            raise service_error(service_error.internal,
1526                    "Can't make temporary dir")
1527
1528        req = req.get('CreateRequestBody', None)
1529        if not req:
1530            raise service_error(service_error.req,
1531                    "Bad request format (no CreateRequestBody)")
1532        # The tcl parser needs to read a file so put the content into that file
1533        descr=req.get('experimentdescription', None)
1534        if descr:
1535            file_content=descr.get('ns2description', None)
1536            if file_content:
1537                try:
1538                    f = open(tclfile, 'w')
1539                    f.write(file_content)
1540                    f.close()
1541                except IOError:
1542                    raise service_error(service_error.internal,
1543                            "Cannot write temp experiment description")
1544            else:
1545                raise service_error(service_error.req, 
1546                        "Only ns2descriptions supported")
1547        else:
1548            raise service_error(service_error.req, "No experiment description")
1549
1550        if req.has_key('experimentID') and \
1551                req['experimentID'].has_key('localname'):
1552            eid = req['experimentID']['localname']
1553            self.state_lock.acquire()
1554            while (self.state.has_key(eid)):
1555                eid += random.choice(string.ascii_letters)
1556            # To avoid another thread picking this localname
1557            self.state[eid] = "placeholder"
1558            self.state_lock.release()
1559        else:
1560            eid = self.exp_stem
1561            for i in range(0,5):
1562                eid += random.choice(string.ascii_letters)
1563            self.state_lock.acquire()
1564            while (self.state.has_key(eid)):
1565                eid = self.exp_stem
1566                for i in range(0,5):
1567                    eid += random.choice(string.ascii_letters)
1568            # To avoid another thread picking this localname
1569            self.state[eid] = "placeholder"
1570            self.state_lock.release()
1571
1572        try: 
1573            # This catches exceptions to clear the placeholder if necessary
1574            try:
1575                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1576            except ValueError:
1577                raise service_error(service_error.server_config, 
1578                        "Bad key type (%s)" % self.ssh_type)
1579
1580            user = req.get('user', None)
1581            if user == None:
1582                raise service_error(service_error.req, "No user")
1583
1584            master = req.get('master', None)
1585            if not master:
1586                raise service_error(service_error.req,
1587                        "No master testbed label")
1588            export_project = req.get('exportProject', None)
1589            if not export_project:
1590                raise service_error(service_error.req, "No export project")
1591           
1592            if self.splitter_url:
1593                self.log.debug("Calling remote splitter at %s" % \
1594                        self.splitter_url)
1595                split_data = self.remote_splitter(self.splitter_url,
1596                        file_content, master)
1597            else:
1598                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1599                    str(self.muxmax), '-m', master]
1600
1601                if self.fedkit:
1602                    tclcmd.append('-k')
1603
1604                tclcmd.extend([pid, gid, eid, tclfile])
1605
1606                self.log.debug("running local splitter %s", " ".join(tclcmd))
1607                tclparser = Popen(tclcmd, stdout=PIPE)
1608                split_data = tclparser.stdout
1609
1610            allocated = { }         # Testbeds we can access
1611            started = { }           # Testbeds where a sub-experiment started
1612                                # successfully
1613
1614            # Objects to parse the splitter output (defined above)
1615            parse_current_testbed = self.current_testbed(eid, tmpdir,
1616                    self.fedkit)
1617            parse_allbeds = self.allbeds(self.get_access)
1618            parse_gateways = self.gateways(eid, master, tmpdir,
1619                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1620                    self.fedkit)
1621            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1622                        "^#\s+End\s+Vtopo")
1623            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1624                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1625            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1626                    "^#\s+End\s+tarfiles")
1627            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1628                    "^#\s+End\s+rpms")
1629
1630            # Working on the split data
1631            for line in split_data:
1632                line = line.rstrip()
1633                if parse_current_testbed(line, master, allocated, tbparams):
1634                    continue
1635                elif parse_allbeds(line, user, tbparams, master, export_project,
1636                        access_user):
1637                    continue
1638                elif parse_gateways(line, allocated, tbparams):
1639                    continue
1640                elif parse_vtopo(line):
1641                    continue
1642                elif parse_hostnames(line):
1643                    continue
1644                elif parse_tarfiles(line):
1645                    continue
1646                elif parse_rpms(line):
1647                    continue
1648                else:
1649                    raise service_error(service_error.internal, 
1650                            "Bad tcl parse? %s" % line)
1651            # Virtual topology and visualization
1652            vtopo = self.gentopo(parse_vtopo.str)
1653            if not vtopo:
1654                raise service_error(service_error.internal, 
1655                        "Failed to generate virtual topology")
1656
1657            vis = self.genviz(vtopo)
1658            if not vis:
1659                raise service_error(service_error.internal, 
1660                        "Failed to generate visualization")
1661           
1662            # save federant information
1663            for k in allocated.keys():
1664                tbparams[k]['federant'] = {\
1665                        'name': [ { 'localname' : eid} ],\
1666                        'emulab': tbparams[k]['emulab'],\
1667                        'allocID' : tbparams[k]['allocID'],\
1668                        'master' : k == master,\
1669                    }
1670
1671
1672            # Copy tarfiles and rpms needed at remote sites into a staging area
1673            try:
1674                if self.fedkit:
1675                    parse_tarfiles.list.append(self.fedkit)
1676                for t in parse_tarfiles.list:
1677                    if not os.path.exists("%s/tarfiles" % tmpdir):
1678                        os.mkdir("%s/tarfiles" % tmpdir)
1679                    self.copy_file(t, "%s/tarfiles/%s" % \
1680                            (tmpdir, os.path.basename(t)))
1681                for r in parse_rpms.list:
1682                    if not os.path.exists("%s/rpms" % tmpdir):
1683                        os.mkdir("%s/rpms" % tmpdir)
1684                    self.copy_file(r, "%s/rpms/%s" % \
1685                            (tmpdir, os.path.basename(r)))
1686            except IOError, e:
1687                raise service_error(service_error.internal, 
1688                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1689
1690        except service_error, e:
1691            # If something goes wrong in the parse (usually an access error)
1692            # clear the placeholder state.  From here on out the code delays
1693            # exceptions.
1694            self.state_lock.acquire()
1695            del self.state[eid]
1696            self.state_lock.release()
1697            raise e
1698
1699        thread_pool_info = self.thread_pool()
1700        threads = [ ]
1701
1702        for tb in [ k for k in allocated.keys() if k != master]:
1703            # Wait until we have a free slot to start the next testbed load
1704            thread_pool_info.acquire()
1705            while thread_pool_info.started - \
1706                    thread_pool_info.terminated >= self.nthreads:
1707                thread_pool_info.wait()
1708            thread_pool_info.release()
1709
1710            # Create and start a thread to start the segment, and save it to
1711            # get the return value later
1712            t  = self.pooled_thread(target=self.start_segment, 
1713                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1714                    pdata=thread_pool_info, trace_file=self.trace_file)
1715            threads.append(t)
1716            t.start()
1717
1718        # Wait until all finish (the first clause of the while is to make sure
1719        # one starts)
1720        thread_pool_info.acquire()
1721        while thread_pool_info.started == 0 or \
1722                thread_pool_info.started > thread_pool_info.terminated:
1723            thread_pool_info.wait()
1724        thread_pool_info.release()
1725
1726        # If none failed, start the master
1727        failed = [ t.getName() for t in threads if not t.rv ]
1728
1729        if len(failed) == 0:
1730            if not self.start_segment(master, eid, tbparams, tmpdir):
1731                failed.append(master)
1732
1733        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1734        # If one failed clean up, unless fail_soft is set
1735        if failed:
1736            if not fail_soft:
1737                for tb in succeeded:
1738                    self.stop_segment(tb, eid, tbparams)
1739                # release the allocations
1740                for tb in tbparams.keys():
1741                    self.release_access(tb, tbparams[tb]['allocID'])
1742                # Remove the placeholder
1743                self.state_lock.acquire()
1744                del self.state[eid]
1745                self.state_lock.release()
1746
1747                raise service_error(service_error.federant,
1748                    "Swap in failed on %s" % ",".join(failed))
1749        else:
1750            self.log.info("[start_segment]: Experiment %s started" % eid)
1751
1752        # Generate an ID for the experiment (slice) and a certificate that the
1753        # allocator can use to prove they own it.  We'll ship it back through
1754        # the encrypted connection.
1755        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1756
1757        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1758
1759        # Walk up tmpdir, deleting as we go
1760        for path, dirs, files in os.walk(tmpdir, topdown=False):
1761            for f in files:
1762                os.remove(os.path.join(path, f))
1763            for d in dirs:
1764                os.rmdir(os.path.join(path, d))
1765        os.rmdir(tmpdir)
1766
1767        # The deepcopy prevents the allocation ID and other binaries from being
1768        # translated into other formats
1769        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1770                for tb in tbparams.keys() \
1771                    if tbparams[tb].has_key('federant') ],\
1772                    'vtopo': vtopo,\
1773                    'vis' : vis,
1774                    'experimentID' : [\
1775                            { 'fedid': copy.copy(expid) }, \
1776                            { 'localname': eid },\
1777                        ],\
1778                    'experimentAccess': { 'X509' : expcert },\
1779                }
1780
1781        # Insert the experiment into our state and update the disk copy
1782        self.state_lock.acquire()
1783        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1784                for tb in tbparams.keys() \
1785                    if tbparams[tb].has_key('federant') ],\
1786                    'vtopo': vtopo,\
1787                    'vis' : vis,
1788                    'owner': fid,
1789                    'experimentID' : [\
1790                            { 'fedid': expid }, { 'localname': eid },\
1791                        ],\
1792                }
1793        self.state[eid] = self.state[expid]
1794        if self.state_filename: self.write_state()
1795        self.state_lock.release()
1796
1797        self.auth.set_attribute(fid, expid)
1798        self.auth.set_attribute(expid, expid)
1799
1800        if not failed:
1801            return resp
1802        else:
1803            raise service_error(service_error.partial, \
1804                    "Partial swap in on %s" % ",".join(succeeded))
1805
1806    def check_experiment_access(self, fid, key):
1807        """
1808        Confirm that the fid has access to the experiment.  Though a request
1809        may be made in terms of a local name, the access attribute is always
1810        the experiment's fedid.
1811        """
1812        if not isinstance(key, fedid):
1813            self.state_lock.acquire()
1814            if self.state.has_key(key):
1815                try:
1816                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1817                            if f.has_key('fedid') ]
1818                except KeyError:
1819                    self.state_lock.release()
1820                    raise service_error(service_error.internal, 
1821                            "No fedid for experiment %s when checking " +\
1822                                    "access(!?)" % key)
1823                if len(kl) == 1:
1824                    key = kl[0]
1825                else:
1826                    self.state_lock.release()
1827                    raise service_error(service_error.internal, 
1828                            "multiple fedids for experiment %s when " +\
1829                                    "checking access(!?)" % key)
1830            else:
1831                self.state_lock.release()
1832                raise service_error(service_error.access, "Access Denied")
1833            self.state_lock.release()
1834
1835        if self.auth.check_attribute(fid, key):
1836            return True
1837        else:
1838            raise service_error(service_error.access, "Access Denied")
1839
1840
1841
1842    def get_vtopo(self, req, fid):
1843        """
1844        Return the stored virtual topology for this experiment
1845        """
1846        rv = None
1847
1848        req = req.get('VtopoRequestBody', None)
1849        if not req:
1850            raise service_error(service_error.req,
1851                    "Bad request format (no VtopoRequestBody)")
1852        exp = req.get('experiment', None)
1853        if exp:
1854            if exp.has_key('fedid'):
1855                key = exp['fedid']
1856                keytype = "fedid"
1857            elif exp.has_key('localname'):
1858                key = exp['localname']
1859                keytype = "localname"
1860            else:
1861                raise service_error(service_error.req, "Unknown lookup type")
1862        else:
1863            raise service_error(service_error.req, "No request?")
1864
1865        self.check_experiment_access(fid, key)
1866
1867        self.state_lock.acquire()
1868        if self.state.has_key(key):
1869            rv = { 'experiment' : {keytype: key },\
1870                    'vtopo': self.state[key]['vtopo'],\
1871                }
1872        self.state_lock.release()
1873
1874        if rv: return rv
1875        else: raise service_error(service_error.req, "No such experiment")
1876
1877    def get_vis(self, req, fid):
1878        """
1879        Return the stored visualization for this experiment
1880        """
1881        rv = None
1882
1883        req = req.get('VisRequestBody', None)
1884        if not req:
1885            raise service_error(service_error.req,
1886                    "Bad request format (no VisRequestBody)")
1887        exp = req.get('experiment', None)
1888        if exp:
1889            if exp.has_key('fedid'):
1890                key = exp['fedid']
1891                keytype = "fedid"
1892            elif exp.has_key('localname'):
1893                key = exp['localname']
1894                keytype = "localname"
1895            else:
1896                raise service_error(service_error.req, "Unknown lookup type")
1897        else:
1898            raise service_error(service_error.req, "No request?")
1899
1900        self.check_experiment_access(fid, key)
1901
1902        self.state_lock.acquire()
1903        if self.state.has_key(key):
1904            rv =  { 'experiment' : {keytype: key },\
1905                    'vis': self.state[key]['vis'],\
1906                    }
1907        self.state_lock.release()
1908
1909        if rv: return rv
1910        else: raise service_error(service_error.req, "No such experiment")
1911
1912    def get_info(self, req, fid):
1913        """
1914        Return all the stored info about this experiment
1915        """
1916        rv = None
1917
1918        req = req.get('InfoRequestBody', None)
1919        if not req:
1920            raise service_error(service_error.req,
1921                    "Bad request format (no VisRequestBody)")
1922        exp = req.get('experiment', None)
1923        if exp:
1924            if exp.has_key('fedid'):
1925                key = exp['fedid']
1926                keytype = "fedid"
1927            elif exp.has_key('localname'):
1928                key = exp['localname']
1929                keytype = "localname"
1930            else:
1931                raise service_error(service_error.req, "Unknown lookup type")
1932        else:
1933            raise service_error(service_error.req, "No request?")
1934
1935        self.check_experiment_access(fid, key)
1936
1937        # The state may be massaged by the service function that called
1938        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1939        # state.
1940        self.state_lock.acquire()
1941        if self.state.has_key(key):
1942            rv = copy.deepcopy(self.state[key])
1943        self.state_lock.release()
1944
1945        if rv: return rv
1946        else: raise service_error(service_error.req, "No such experiment")
1947
1948
1949    def terminate_experiment(self, req, fid):
1950        """
1951        Swap this experiment out on the federants and delete the shared
1952        information
1953        """
1954        tbparams = { }
1955        req = req.get('TerminateRequestBody', None)
1956        if not req:
1957            raise service_error(service_error.req,
1958                    "Bad request format (no TerminateRequestBody)")
1959        exp = req.get('experiment', None)
1960        if exp:
1961            if exp.has_key('fedid'):
1962                key = exp['fedid']
1963                keytype = "fedid"
1964            elif exp.has_key('localname'):
1965                key = exp['localname']
1966                keytype = "localname"
1967            else:
1968                raise service_error(service_error.req, "Unknown lookup type")
1969        else:
1970            raise service_error(service_error.req, "No request?")
1971
1972        self.check_experiment_access(fid, key)
1973
1974        self.state_lock.acquire()
1975        fed_exp = self.state.get(key, None)
1976
1977        if fed_exp:
1978            # This branch of the conditional holds the lock to generate a
1979            # consistent temporary tbparams variable to deallocate experiments.
1980            # It releases the lock to do the deallocations and reacquires it to
1981            # remove the experiment state when the termination is complete.
1982            ids = []
1983            #  experimentID is a list of dicts that are self-describing
1984            #  identifiers.  This finds all the fedids and localnames - the
1985            #  keys of self.state - and puts them into ids.
1986            for id in fed_exp.get('experimentID', []):
1987                if id.has_key('fedid'): ids.append(id['fedid'])
1988                if id.has_key('localname'): ids.append(id['localname'])
1989
1990            # Construct enough of the tbparams to make the stop_segment calls
1991            # work
1992            for fed in fed_exp['federant']:
1993                try:
1994                    for e in fed['name']:
1995                        eid = e.get('localname', None)
1996                        if eid: break
1997                    else:
1998                        continue
1999
2000                    p = fed['emulab']['project']
2001
2002                    project = p['name']['localname']
2003                    tb = p['testbed']['localname']
2004                    user = p['user'][0]['userID']['localname']
2005
2006                    domain = fed['emulab']['domain']
2007                    host  = fed['emulab']['ops']
2008                    aid = fed['allocID']
2009                except KeyError, e:
2010                    continue
2011                tbparams[tb] = {\
2012                        'user': user,\
2013                        'domain': domain,\
2014                        'project': project,\
2015                        'host': host,\
2016                        'eid': eid,\
2017                        'aid': aid,\
2018                    }
2019            self.state_lock.release()
2020
2021            # Stop everyone.
2022            for tb in tbparams.keys():
2023                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
2024
2025            # release the allocations
2026            for tb in tbparams.keys():
2027                self.release_access(tb, tbparams[tb]['aid'])
2028
2029            # Remove the terminated experiment
2030            self.state_lock.acquire()
2031            for id in ids:
2032                if self.state.has_key(id): del self.state[id]
2033
2034            if self.state_filename: self.write_state()
2035            self.state_lock.release()
2036
2037            return { 'experiment': exp }
2038        else:
2039            # Don't forget to release the lock
2040            self.state_lock.release()
2041            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.