source: fedd/federation/experiment_control.py @ c8c2c64

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since c8c2c64 was 8bc5754, checked in by Ted Faber <faber@…>, 16 years ago

Parallelize testbed swapouts, including failed creations and terminate calls.
I also cleaned up the thread_pool interface some to aviod cutting and pasting
rather cryptic code throughout those functions. The interface is still a
little odd, but much more palatable.

  • Property mode set to 100644
File size: 62.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self, nthreads):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53            self.nthreads = nthreads
54
55        def acquire(self):
56            """
57            Get the pool's lock.
58            """
59            self.changed.acquire()
60
61        def release(self):
62            """
63            Release the pool's lock.
64            """
65            self.changed.release()
66
67        def wait(self, timeout = None):
68            """
69            Wait for a pool thread to start or stop.
70            """
71            self.changed.wait(timeout)
72
73        def start(self):
74            """
75            Called by a pool thread to report starting.
76            """
77            self.changed.acquire()
78            self.started += 1
79            self.changed.notifyAll()
80            self.changed.release()
81
82        def terminate(self):
83            """
84            Called by a pool thread to report finishing.
85            """
86            self.changed.acquire()
87            self.terminated += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def clear(self):
92            """
93            Clear all pool data.
94            """
95            self.changed.acquire()
96            self.started = 0
97            self.terminated =0
98            self.changed.notifyAll()
99            self.changed.release()
100
101        def wait_for_slot(self):
102            """
103            Wait until we have a free slot to start another pooled thread
104            """
105            self.acquire()
106            while self.started - self.terminated >= self.nthreads:
107                self.wait()
108            self.release()
109
110        def wait_for_all_done(self):
111            """
112            Wait until all active threads finish (and at least one has started)
113            """
114            self.acquire()
115            while self.started == 0 or self.started > self.terminated:
116                self.wait()
117            self.release()
118
119    class pooled_thread(Thread):
120        """
121        One of a set of threads dedicated to a specific task.  Uses the
122        thread_pool class above for coordination.
123        """
124        def __init__(self, group=None, target=None, name=None, args=(), 
125                kwargs={}, pdata=None, trace_file=None):
126            Thread.__init__(self, group, target, name, args, kwargs)
127            self.rv = None          # Return value of the ops in this thread
128            self.exception = None   # Exception that terminated this thread
129            self.target=target      # Target function to run on start()
130            self.args = args        # Args to pass to target
131            self.kwargs = kwargs    # Additional kw args
132            self.pdata = pdata      # thread_pool for this class
133            # Logger for this thread
134            self.log = logging.getLogger("fedd.experiment_control")
135       
136        def run(self):
137            """
138            Emulate Thread.run, except add pool data manipulation and error
139            logging.
140            """
141            if self.pdata:
142                self.pdata.start()
143
144            if self.target:
145                try:
146                    self.rv = self.target(*self.args, **self.kwargs)
147                except service_error, s:
148                    self.exception = s
149                    self.log.error("Thread exception: %s %s" % \
150                            (s.code_string(), s.desc))
151                except:
152                    self.exception = sys.exc_info()[1]
153                    self.log.error(("Unexpected thread exception: %s" +\
154                            "Trace %s") % (self.exception,\
155                                traceback.format_exc()))
156            if self.pdata:
157                self.pdata.terminate()
158
159    call_RequestAccess = service_caller('RequestAccess')
160    call_ReleaseAccess = service_caller('ReleaseAccess')
161    call_Ns2Split = service_caller('Ns2Split')
162
163    def __init__(self, config=None, auth=None):
164        """
165        Intialize the various attributes, most from the config object
166        """
167        self.thread_with_rv = experiment_control_local.pooled_thread
168        self.thread_pool = experiment_control_local.thread_pool
169
170        self.cert_file = config.get("experiment_control", "cert_file")
171        if self.cert_file:
172            self.cert_pwd = config.get("experiment_control", "cert_pwd")
173        else:
174            self.cert_file = config.get("globals", "cert_file")
175            self.cert_pwd = config.get("globals", "cert_pwd")
176
177        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
178                or config.get("globals", "trusted_certs")
179
180        self.exp_stem = "fed-stem"
181        self.log = logging.getLogger("fedd.experiment_control")
182        set_log_level(config, "experiment_control", self.log)
183        self.muxmax = 2
184        self.nthreads = 2
185        self.randomize_experiments = False
186
187        self.scp_exec = "/usr/bin/scp"
188        self.splitter = None
189        self.ssh_exec="/usr/bin/ssh"
190        self.ssh_keygen = "/usr/bin/ssh-keygen"
191        self.ssh_identity_file = None
192
193
194        self.debug = config.getboolean("experiment_control", "create_debug")
195        self.state_filename = config.get("experiment_control", 
196                "experiment_state")
197        self.splitter_url = config.get("experiment_control", "splitter_uri")
198        self.fedkit = config.get("experiment_control", "fedkit")
199        accessdb_file = config.get("experiment_control", "accessdb")
200
201        self.ssh_pubkey_file = config.get("experiment_control", 
202                "ssh_pubkey_file")
203        self.ssh_privkey_file = config.get("experiment_control",
204                "ssh_privkey_file")
205        # NB for internal master/slave ops, not experiment setup
206        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
207        self.state = { }
208        self.state_lock = Lock()
209        self.tclsh = "/usr/local/bin/otclsh"
210        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
211                config.get("experiment_control", "tcl_splitter",
212                        "/usr/testbed/lib/ns2ir/parse.tcl")
213        mapdb_file = config.get("experiment_control", "mapdb")
214        self.trace_file = sys.stderr
215
216        self.def_expstart = \
217                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
218                "/tmp/federate";
219        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
220                "FEDDIR/hosts";
221        self.def_gwstart = \
222                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
223                "/tmp/bridge.log";
224        self.def_mgwstart = \
225                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
226                "/tmp/bridge.log";
227        self.def_gwimage = "FBSD61-TUNNEL2";
228        self.def_gwtype = "pc";
229        self.local_access = { }
230
231        if auth:
232            self.auth = auth
233        else:
234            self.log.error(\
235                    "[access]: No authorizer initialized, creating local one.")
236            auth = authorizer()
237
238
239        if self.ssh_pubkey_file:
240            try:
241                f = open(self.ssh_pubkey_file, 'r')
242                self.ssh_pubkey = f.read()
243                f.close()
244            except IOError:
245                raise service_error(service_error.internal,
246                        "Cannot read sshpubkey")
247        else:
248            raise service_error(service_error.internal, 
249                    "No SSH public key file?")
250
251        if not self.ssh_privkey_file:
252            raise service_error(service_error.internal, 
253                    "No SSH public key file?")
254
255
256        if mapdb_file:
257            self.read_mapdb(mapdb_file)
258        else:
259            self.log.warn("[experiment_control] No testbed map, using defaults")
260            self.tbmap = { 
261                    'deter':'https://users.isi.deterlab.net:23235',
262                    'emulab':'https://users.isi.deterlab.net:23236',
263                    'ucb':'https://users.isi.deterlab.net:23237',
264                    }
265
266        if accessdb_file:
267                self.read_accessdb(accessdb_file)
268        else:
269            raise service_error(service_error.internal,
270                    "No accessdb specified in config")
271
272        # Grab saved state.  OK to do this w/o locking because it's read only
273        # and only one thread should be in existence that can see self.state at
274        # this point.
275        if self.state_filename:
276            self.read_state()
277
278        # Dispatch tables
279        self.soap_services = {\
280                'Create': soap_handler('Create', self.create_experiment),
281                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
282                'Vis': soap_handler('Vis', self.get_vis),
283                'Info': soap_handler('Info', self.get_info),
284                'Terminate': soap_handler('Terminate', 
285                    self.terminate_experiment),
286        }
287
288        self.xmlrpc_services = {\
289                'Create': xmlrpc_handler('Create', self.create_experiment),
290                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
291                'Vis': xmlrpc_handler('Vis', self.get_vis),
292                'Info': xmlrpc_handler('Info', self.get_info),
293                'Terminate': xmlrpc_handler('Terminate',
294                    self.terminate_experiment),
295        }
296
297    def copy_file(self, src, dest, size=1024):
298        """
299        Exceedingly simple file copy.
300        """
301        s = open(src,'r')
302        d = open(dest, 'w')
303
304        buf = "x"
305        while buf != "":
306            buf = s.read(size)
307            d.write(buf)
308        s.close()
309        d.close()
310
311    # Call while holding self.state_lock
312    def write_state(self):
313        """
314        Write a new copy of experiment state after copying the existing state
315        to a backup.
316
317        State format is a simple pickling of the state dictionary.
318        """
319        if os.access(self.state_filename, os.W_OK):
320            self.copy_file(self.state_filename, \
321                    "%s.bak" % self.state_filename)
322        try:
323            f = open(self.state_filename, 'w')
324            pickle.dump(self.state, f)
325        except IOError, e:
326            self.log.error("Can't write file %s: %s" % \
327                    (self.state_filename, e))
328        except pickle.PicklingError, e:
329            self.log.error("Pickling problem: %s" % e)
330        except TypeError, e:
331            self.log.error("Pickling problem (TypeError): %s" % e)
332
333    # Call while holding self.state_lock
334    def read_state(self):
335        """
336        Read a new copy of experiment state.  Old state is overwritten.
337
338        State format is a simple pickling of the state dictionary.
339        """
340        try:
341            f = open(self.state_filename, "r")
342            self.state = pickle.load(f)
343            self.log.debug("[read_state]: Read state from %s" % \
344                    self.state_filename)
345        except IOError, e:
346            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
347                    % (self.state_filename, e))
348        except pickle.UnpicklingError, e:
349            self.log.warning(("[read_state]: No saved state: " + \
350                    "Unpickling failed: %s") % e)
351       
352        for k in self.state.keys():
353            try:
354                # This list should only have one element in it, but phrasing it
355                # as a for loop doesn't cost much, really.  We have to find the
356                # fedid elements anyway.
357                for eid in [ f['fedid'] \
358                        for f in self.state[k]['experimentID']\
359                            if f.has_key('fedid') ]:
360                    self.auth.set_attribute(self.state[k]['owner'], eid)
361            except KeyError, e:
362                self.log.warning("[read_state]: State ownership or identity " +\
363                        "misformatted in %s: %s" % (self.state_filename, e))
364
365
366    def read_accessdb(self, accessdb_file):
367        """
368        Read the mapping from fedids that can create experiments to their name
369        in the 3-level access namespace.  All will be asserted from this
370        testbed and can include the local username and porject that will be
371        asserted on their behalf by this fedd.  Each fedid is also added to the
372        authorization system with the "create" attribute.
373        """
374        self.accessdb = {}
375        # These are the regexps for parsing the db
376        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
377        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
378                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
379        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
380                "\s*->\s*(" + name_expr + ")\s*$")
381        lineno = 0
382
383        # Parse the mappings and store in self.authdb, a dict of
384        # fedid -> (proj, user)
385        try:
386            f = open(accessdb_file, "r")
387            for line in f:
388                lineno += 1
389                line = line.strip()
390                if len(line) == 0 or line.startswith('#'):
391                    continue
392                m = project_line.match(line)
393                if m:
394                    fid = fedid(hexstr=m.group(1))
395                    project, user = m.group(2,3)
396                    if not self.accessdb.has_key(fid):
397                        self.accessdb[fid] = []
398                    self.accessdb[fid].append((project, user))
399                    continue
400
401                m = user_line.match(line)
402                if m:
403                    fid = fedid(hexstr=m.group(1))
404                    project = None
405                    user = m.group(2)
406                    if not self.accessdb.has_key(fid):
407                        self.accessdb[fid] = []
408                    self.accessdb[fid].append((project, user))
409                    continue
410                self.log.warn("[experiment_control] Error parsing access " +\
411                        "db %s at line %d" %  (accessdb_file, lineno))
412        except IOError:
413            raise service_error(service_error.internal,
414                    "Error opening/reading %s as experiment " +\
415                            "control accessdb" %  accessdb_file)
416        f.close()
417
418        # Initialize the authorization attributes
419        for fid in self.accessdb.keys():
420            self.auth.set_attribute(fid, 'create')
421
422    def read_mapdb(self, file):
423        """
424        Read a simple colon separated list of mappings for the
425        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
426        """
427
428        self.tbmap = { }
429        lineno =0
430        try:
431            f = open(file, "r")
432            for line in f:
433                lineno += 1
434                line = line.strip()
435                if line.startswith('#') or len(line) == 0:
436                    continue
437                try:
438                    label, url = line.split(':', 1)
439                    self.tbmap[label] = url
440                except ValueError, e:
441                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
442                            "map db: %s %s" % (lineno, line, e))
443        except IOError, e:
444            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
445                    "open %s: %s" % (file, e))
446        f.close()
447
448    def scp_file(self, file, user, host, dest=""):
449        """
450        scp a file to the remote host.  If debug is set the action is only
451        logged.
452        """
453
454        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
455                '-o', 'StrictHostKeyChecking yes', '-i', 
456                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
457        rv = 0
458
459        try:
460            dnull = open("/dev/null", "w")
461        except IOError:
462            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
463            dnull = Null
464
465        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
466        if not self.debug:
467            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
468
469        return rv == 0
470
471    def ssh_cmd(self, user, host, cmd, wname=None):
472        """
473        Run a remote command on host as user.  If debug is set, the action is
474        only logged.
475        """
476        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
477                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
478                (self.ssh_exec, self.ssh_privkey_file, 
479                        user, host, cmd)
480
481        try:
482            dnull = open("/dev/null", "r")
483        except IOError:
484            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
485            dnull = Null
486
487        self.log.debug("[ssh_cmd]: %s" % sh_str)
488        if not self.debug:
489            if dnull:
490                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
491            else:
492                sub = Popen(sh_str, shell=True)
493            return sub.wait() == 0
494        else:
495            return True
496
497    def ship_configs(self, host, user, src_dir, dest_dir):
498        """
499        Copy federant-specific configuration files to the federant.
500        """
501        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
502            return False
503        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
504            return False
505
506        for f in os.listdir(src_dir):
507            if os.path.isdir(f):
508                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
509                        "%s/%s" % (dest_dir, f)):
510                    return False
511            else:
512                if not self.scp_file("%s/%s" % (src_dir, f), 
513                        user, host, dest_dir):
514                    return False
515        return True
516
517    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
518        """
519        Start a sub-experiment on a federant.
520
521        Get the current state, modify or create as appropriate, ship data and
522        configs and start the experiment.  There are small ordering differences
523        based on the initial state of the sub-experiment.
524        """
525        # ops node in the federant
526        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
527        user = tbparams[tb]['user']     # federant user
528        pid = tbparams[tb]['project']   # federant project
529        # XXX
530        base_confs = ( "hosts",)
531        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
532        # command to test experiment state
533        expinfo_exec = "/usr/testbed/bin/expinfo" 
534        # Configuration directories on the remote machine
535        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
536        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
537        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
538        # Regular expressions to parse the expinfo response
539        state_re = re.compile("State:\s+(\w+)")
540        no_exp_re = re.compile("^No\s+such\s+experiment")
541        state = None    # Experiment state parsed from expinfo
542        # The expinfo ssh command.  Note the identity restriction to use only
543        # the identity provided in the pubkey given.
544        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
545                'StrictHostKeyChecking yes', '-i', 
546                self.ssh_privkey_file, "%s@%s" % (user, host), 
547                expinfo_exec, pid, eid]
548
549        # Get status
550        self.log.debug("[start_segment]: %s"% " ".join(cmd))
551        dev_null = None
552        try:
553            dev_null = open("/dev/null", "a")
554        except IOError, e:
555            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
556
557        if self.debug:
558            state = 'swapped'
559            rv = 0
560        else:
561            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
562            for line in status.stdout:
563                m = state_re.match(line)
564                if m: state = m.group(1)
565                else:
566                    m = no_exp_re.match(line)
567                    if m: state = "none"
568            rv = status.wait()
569
570        # If the experiment is not present the subcommand returns a non-zero
571        # return value.  If we successfully parsed a "none" outcome, ignore the
572        # return code.
573        if rv != 0 and state != "none":
574            raise service_error(service_error.internal,
575                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
576
577        self.log.debug("[start_segment]: %s: %s" % (tb, state))
578        self.log.info("[start_segment]:transferring experiment to %s" % tb)
579
580        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
581            return False
582        # Clear the federation config dirs
583        if not self.ssh_cmd(user, host, 
584                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
585            return False
586        # Clear and create the tarfiles and rpm directories
587        for d in (tarfiles_dir, rpms_dir):
588            if not self.ssh_cmd(user, host, 
589                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
590                return False
591            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
592                    "create tarfiles"):
593                return False
594       
595        if state == 'active':
596            # Create the federation config dirs (do not move outside the
597            # conditional.  Happens later in new expriment creation)
598            if not self.ssh_cmd(user, host, 
599                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
600                return False
601            # Remote experiment is active.  Modify it.
602            for f in base_confs:
603                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
604                        "%s/%s" % (proj_dir, f)):
605                    return False
606            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
607                    proj_dir):
608                return False
609            if os.path.isdir("%s/tarfiles" % tmpdir):
610                if not self.ship_configs(host, user,
611                        "%s/tarfiles" % tmpdir, tarfiles_dir):
612                    return False
613            if os.path.isdir("%s/rpms" % tmpdir):
614                if not self.ship_configs(host, user,
615                        "%s/rpms" % tmpdir, tarfiles_dir):
616                    return False
617            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
618            if not self.ssh_cmd(user, host,
619                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
620                            (pid, eid, tclfile), "modexp"):
621                return False
622            return True
623        elif state == "swapped":
624            # Create the federation config dirs (do not move outside the
625            # conditional.  Happens later in new expriment creation)
626            if not self.ssh_cmd(user, host, 
627                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
628                return False
629            # Remote experiment swapped out.  Modify it and swap it in.
630            for f in base_confs:
631                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
632                        "%s/%s" % (proj_dir, f)):
633                    return False
634            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
635                    proj_dir):
636                return False
637            if os.path.isdir("%s/tarfiles" % tmpdir):
638                if not self.ship_configs(host, user,
639                        "%s/tarfiles" % tmpdir, tarfiles_dir):
640                    return False
641            if os.path.isdir("%s/rpms" % tmpdir):
642                if not self.ship_configs(host, user,
643                        "%s/rpms" % tmpdir, tarfiles_dir):
644                    return False
645            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
646            if not self.ssh_cmd(user, host,
647                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
648                    "modexp"):
649                return False
650            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
651            if not self.ssh_cmd(user, host,
652                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
653                    "swapexp"):
654                return False
655            return True
656        elif state == "none":
657            # No remote experiment.  Create one.  We do this in 2 steps so we
658            # can put the configuration files and scripts into the new
659            # experiment directories.
660
661            # Tarfiles must be present for creation to work
662            if os.path.isdir("%s/tarfiles" % tmpdir):
663                if not self.ship_configs(host, user,
664                        "%s/tarfiles" % tmpdir, tarfiles_dir):
665                    return False
666            if os.path.isdir("%s/rpms" % tmpdir):
667                if not self.ship_configs(host, user,
668                        "%s/rpms" % tmpdir, tarfiles_dir):
669                    return False
670            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
671            if not self.ssh_cmd(user, host,
672                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
673                            (pid, eid, tclfile), "startexp"):
674                return False
675            # Create the federation config dirs (do not move outside the
676            # conditional.)
677            if not self.ssh_cmd(user, host, 
678                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
679                return False
680            # After startexp the per-experiment directories exist
681            for f in base_confs:
682                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
683                        "%s/%s" % (proj_dir, f)):
684                    return False
685            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
686                    proj_dir):
687                return False
688            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
689            if not self.ssh_cmd(user, host,
690                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
691                    "swapexp"):
692                return False
693            return True
694        else:
695            self.log.debug("[start_segment]:unknown state %s" % state)
696            return False
697
698    def stop_segment(self, tb, eid, tbparams):
699        """
700        Stop a sub experiment by calling swapexp on the federant
701        """
702        user = tbparams[tb]['user']
703        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
704        pid = tbparams[tb]['project']
705
706        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
707        return self.ssh_cmd(user, host,
708                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
709
710       
711    def generate_ssh_keys(self, dest, type="rsa" ):
712        """
713        Generate a set of keys for the gateways to use to talk.
714
715        Keys are of type type and are stored in the required dest file.
716        """
717        valid_types = ("rsa", "dsa")
718        t = type.lower();
719        if t not in valid_types: raise ValueError
720        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
721
722        try:
723            trace = open("/dev/null", "w")
724        except IOError:
725            raise service_error(service_error.internal,
726                    "Cannot open /dev/null??");
727
728        # May raise CalledProcessError
729        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
730        rv = call(cmd, stdout=trace, stderr=trace)
731        if rv != 0:
732            raise service_error(service_error.internal, 
733                    "Cannot generate nonce ssh keys.  %s return code %d" \
734                            % (self.ssh_keygen, rv))
735
736    def gentopo(self, str):
737        """
738        Generate the topology dtat structure from the splitter's XML
739        representation of it.
740
741        The topology XML looks like:
742            <experiment>
743                <nodes>
744                    <node><vname></vname><ips>ip1:ip2</ips></node>
745                </nodes>
746                <lans>
747                    <lan>
748                        <vname></vname><vnode></vnode><ip></ip>
749                        <bandwidth></bandwidth><member>node:port</member>
750                    </lan>
751                </lans>
752        """
753        class topo_parse:
754            """
755            Parse the topology XML and create the dats structure.
756            """
757            def __init__(self):
758                # Typing of the subelements for data conversion
759                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
760                self.int_subelements = ( 'bandwidth',)
761                self.float_subelements = ( 'delay',)
762                # The final data structure
763                self.nodes = [ ]
764                self.lans =  [ ]
765                self.topo = { \
766                        'node': self.nodes,\
767                        'lan' : self.lans,\
768                    }
769                self.element = { }  # Current element being created
770                self.chars = ""     # Last text seen
771
772            def end_element(self, name):
773                # After each sub element the contents is added to the current
774                # element or to the appropriate list.
775                if name == 'node':
776                    self.nodes.append(self.element)
777                    self.element = { }
778                elif name == 'lan':
779                    self.lans.append(self.element)
780                    self.element = { }
781                elif name in self.str_subelements:
782                    self.element[name] = self.chars
783                    self.chars = ""
784                elif name in self.int_subelements:
785                    self.element[name] = int(self.chars)
786                    self.chars = ""
787                elif name in self.float_subelements:
788                    self.element[name] = float(self.chars)
789                    self.chars = ""
790
791            def found_chars(self, data):
792                self.chars += data.rstrip()
793
794
795        tp = topo_parse();
796        parser = xml.parsers.expat.ParserCreate()
797        parser.EndElementHandler = tp.end_element
798        parser.CharacterDataHandler = tp.found_chars
799
800        parser.Parse(str)
801
802        return tp.topo
803       
804
805    def genviz(self, topo):
806        """
807        Generate the visualization the virtual topology
808        """
809
810        neato = "/usr/local/bin/neato"
811        # These are used to parse neato output and to create the visualization
812        # file.
813        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
814        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
815                "%s</type></node>"
816
817        try:
818            # Node names
819            nodes = [ n['vname'] for n in topo['node'] ]
820            topo_lans = topo['lan']
821        except KeyError:
822            raise service_error(service_error.internal, "Bad topology")
823
824        lans = { }
825        links = { }
826
827        # Walk through the virtual topology, organizing the connections into
828        # 2-node connections (links) and more-than-2-node connections (lans).
829        # When a lan is created, it's added to the list of nodes (there's a
830        # node in the visualization for the lan).
831        for l in topo_lans:
832            if links.has_key(l['vname']):
833                if len(links[l['vname']]) < 2:
834                    links[l['vname']].append(l['vnode'])
835                else:
836                    nodes.append(l['vname'])
837                    lans[l['vname']] = links[l['vname']]
838                    del links[l['vname']]
839                    lans[l['vname']].append(l['vnode'])
840            elif lans.has_key(l['vname']):
841                lans[l['vname']].append(l['vnode'])
842            else:
843                links[l['vname']] = [ l['vnode'] ]
844
845
846        # Open up a temporary file for dot to turn into a visualization
847        try:
848            df, dotname = tempfile.mkstemp()
849            dotfile = os.fdopen(df, 'w')
850        except IOError:
851            raise service_error(service_error.internal,
852                    "Failed to open file in genviz")
853
854        # Generate a dot/neato input file from the links, nodes and lans
855        try:
856            print >>dotfile, "graph G {"
857            for n in nodes:
858                print >>dotfile, '\t"%s"' % n
859            for l in links.keys():
860                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
861            for l in lans.keys():
862                for n in lans[l]:
863                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
864            print >>dotfile, "}"
865            dotfile.close()
866        except TypeError:
867            raise service_error(service_error.internal,
868                    "Single endpoint link in vtopo")
869        except IOError:
870            raise service_error(service_error.internal, "Cannot write dot file")
871
872        # Use dot to create a visualization
873        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
874                '-Gpack=true', dotname], stdout=PIPE)
875
876        # Translate dot to vis format
877        vis_nodes = [ ]
878        vis = { 'node': vis_nodes }
879        for line in dot.stdout:
880            m = vis_re.match(line)
881            if m:
882                vn = m.group(1)
883                vis_node = {'name': vn, \
884                        'x': float(m.group(2)),\
885                        'y' : float(m.group(3)),\
886                    }
887                if vn in links.keys() or vn in lans.keys():
888                    vis_node['type'] = 'lan'
889                else:
890                    vis_node['type'] = 'node'
891                vis_nodes.append(vis_node)
892        rv = dot.wait()
893
894        os.remove(dotname)
895        if rv == 0 : return vis
896        else: return None
897
898    def get_access(self, tb, nodes, user, tbparam, master, export_project,
899            access_user):
900        """
901        Get access to testbed through fedd and set the parameters for that tb
902        """
903
904        # XXX This is needless complexity.  It must vanish.
905        translate_attr = {
906            'slavenodestartcmd': 'expstart',
907            'slaveconnectorstartcmd': 'gwstart',
908            'masternodestartcmd': 'mexpstart',
909            'masterconnectorstartcmd': 'mgwstart',
910            'connectorimage': 'gwimage',
911            'connectortype': 'gwtype',
912            'tunnelcfg': 'tun',
913            'tunnelinterface': 'tunnelinterface',
914            'smbshare': 'smbshare',
915        }
916
917        uri = self.tbmap.get(tb, None)
918        if not uri:
919            raise service_error(serice_error.server_config, 
920                    "Unknown testbed: %s" % tb)
921
922        # currently this lumps all users into one service access group
923        service_keys = [ a for u in user \
924                for a in u.get('access', []) \
925                    if a.has_key('sshPubkey')]
926
927        if len(service_keys) == 0:
928            raise service_error(service_error.req, 
929                    "Must have at least one SSH pubkey for services")
930
931
932        for p, u in access_user:
933            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
934                    "to %s") %  ((p or "None"), u, uri))
935
936            if p:
937                # Request with user and project specified
938                req = {\
939                        'destinationTestbed' : { 'uri' : uri },
940                        'project': { 
941                            'name': {'localname': p},
942                            'user': [ {'userID': { 'localname': u } } ],
943                            },
944                        'user':  user,
945                        'allocID' : { 'localname': 'test' },
946                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
947                        'serviceAccess' : service_keys
948                    }
949            else:
950                # Request with only user specified
951                req = {\
952                        'destinationTestbed' : { 'uri' : uri },
953                        'user':  [ {'userID': { 'localname': u } } ],
954                        'allocID' : { 'localname': 'test' },
955                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
956                        'serviceAccess' : service_keys
957                    }
958
959            if tb == master:
960                # NB, the export_project parameter is a dict that includes
961                # the type
962                req['exportProject'] = export_project
963
964            # node resources if any
965            if nodes != None and len(nodes) > 0:
966                rnodes = [ ]
967                for n in nodes:
968                    rn = { }
969                    image, hw, count = n.split(":")
970                    if image: rn['image'] = [ image ]
971                    if hw: rn['hardware'] = [ hw ]
972                    if count and int(count) >0 : rn['count'] = int(count)
973                    rnodes.append(rn)
974                req['resources']= { }
975                req['resources']['node'] = rnodes
976
977            try:
978                if self.local_access.has_key(uri):
979                    # Local access call
980                    req = { 'RequestAccessRequestBody' : req }
981                    r = self.local_access[uri].RequestAccess(req, 
982                            fedid(file=self.cert_file))
983                    r = { 'RequestAccessResponseBody' : r }
984                else:
985                    r = self.call_RequestAccess(uri, req, 
986                            self.cert_file, self.cert_pwd, self.trusted_certs)
987            except service_error, e:
988                if e.code == service_error.access:
989                    self.log.debug("[get_access] Access denied")
990                    r = None
991                    continue
992                else:
993                    raise e
994
995            if r.has_key('RequestAccessResponseBody'):
996                # Through to here we have a valid response, not a fault.
997                # Access denied is a fault, so something better or worse than
998                # access denied has happened.
999                r = r['RequestAccessResponseBody']
1000                self.log.debug("[get_access] Access granted")
1001                break
1002            else:
1003                raise service_error(service_error.protocol,
1004                        "Bad proxy response")
1005       
1006        if not r:
1007            raise service_error(service_error.access, 
1008                    "Access denied by %s (%s)" % (tb, uri))
1009
1010        e = r['emulab']
1011        p = e['project']
1012        tbparam[tb] = { 
1013                "boss": e['boss'],
1014                "host": e['ops'],
1015                "domain": e['domain'],
1016                "fs": e['fileServer'],
1017                "eventserver": e['eventServer'],
1018                "project": unpack_id(p['name']),
1019                "emulab" : e,
1020                "allocID" : r['allocID'],
1021                }
1022        # Make the testbed name be the label the user applied
1023        p['testbed'] = {'localname': tb }
1024
1025        for u in p['user']:
1026            role = u.get('role', None)
1027            if role == 'experimentCreation':
1028                tbparam[tb]['user'] = unpack_id(u['userID'])
1029                break
1030        else:
1031            raise service_error(service_error.internal, 
1032                    "No createExperimentUser from %s" %tb)
1033
1034        for a in e['fedAttr']:
1035            if a['attribute']:
1036                key = translate_attr.get(a['attribute'].lower(), None)
1037                if key:
1038                    tbparam[tb][key]= a['value']
1039       
1040    def release_access(self, tb, aid):
1041        """
1042        Release access to testbed through fedd
1043        """
1044
1045        uri = self.tbmap.get(tb, None)
1046        if not uri:
1047            raise service_error(serice_error.server_config, 
1048                    "Unknown testbed: %s" % tb)
1049
1050        if self.local_access.has_key(uri):
1051            resp = self.local_access[uri].ReleaseAccess(\
1052                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1053                    fedid(file=self.cert_file))
1054            resp = { 'ReleaseAccessResponseBody': resp } 
1055        else:
1056            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1057                    self.cert_file, self.cert_pwd, self.trusted_certs)
1058
1059        # better error coding
1060
1061    def remote_splitter(self, uri, desc, master):
1062
1063        req = {
1064                'description' : { 'ns2description': desc },
1065                'master': master,
1066                'include_fedkit': bool(self.fedkit)
1067            }
1068
1069        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1070                self.trusted_certs)
1071
1072        if r.has_key('Ns2SplitResponseBody'):
1073            r = r['Ns2SplitResponseBody']
1074            if r.has_key('output'):
1075                return r['output'].splitlines()
1076            else:
1077                raise service_error(service_error.protocol, 
1078                        "Bad splitter response (no output)")
1079        else:
1080            raise service_error(service_error.protocol, "Bad splitter response")
1081       
1082    class current_testbed:
1083        """
1084        Object for collecting the current testbed description.  The testbed
1085        description is saved to a file with the local testbed variables
1086        subsittuted line by line.
1087        """
1088        def __init__(self, eid, tmpdir, fedkit):
1089            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1090            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1091            self.current_testbed = None
1092            self.testbed_file = None
1093
1094            self.def_expstart = \
1095                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1096            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1097            self.def_gwstart = \
1098                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1099            self.def_mgwstart = \
1100                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1101            self.def_gwimage = "FBSD61-TUNNEL2";
1102            self.def_gwtype = "pc";
1103
1104            self.eid = eid
1105            self.tmpdir = tmpdir
1106            self.fedkit = fedkit
1107
1108        def __call__(self, line, master, allocated, tbparams):
1109            # Capture testbed topology descriptions
1110            if self.current_testbed == None:
1111                m = self.begin_testbed.match(line)
1112                if m != None:
1113                    self.current_testbed = m.group(1)
1114                    if self.current_testbed == None:
1115                        raise service_error(service_error.req,
1116                                "Bad request format (unnamed testbed)")
1117                    allocated[self.current_testbed] = \
1118                            allocated.get(self.current_testbed,0) + 1
1119                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1120                    if not os.path.exists(tb_dir):
1121                        try:
1122                            os.mkdir(tb_dir)
1123                        except IOError:
1124                            raise service_error(service_error.internal,
1125                                    "Cannot create %s" % tb_dir)
1126                    try:
1127                        self.testbed_file = open("%s/%s.%s.tcl" %
1128                                (tb_dir, self.eid, self.current_testbed), 'w')
1129                    except IOError:
1130                        self.testbed_file = None
1131                    return True
1132                else: return False
1133            else:
1134                m = self.end_testbed.match(line)
1135                if m != None:
1136                    if m.group(1) != self.current_testbed:
1137                        raise service_error(service_error.internal, 
1138                                "Mismatched testbed markers!?")
1139                    if self.testbed_file != None: 
1140                        self.testbed_file.close()
1141                        self.testbed_file = None
1142                    self.current_testbed = None
1143                elif self.testbed_file:
1144                    # Substitute variables and put the line into the local
1145                    # testbed file.
1146                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1147                            self.def_gwtype)
1148                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1149                            self.def_gwimage)
1150                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1151                            self.def_mgwstart)
1152                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1153                            self.def_mexpstart)
1154                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1155                            self.def_gwstart)
1156                    expstart = tbparams[self.current_testbed].get('expstart', 
1157                            self.def_expstart)
1158                    project = tbparams[self.current_testbed].get('project')
1159                    line = re.sub("GWTYPE", gwtype, line)
1160                    line = re.sub("GWIMAGE", gwimage, line)
1161                    if self.current_testbed == master:
1162                        line = re.sub("GWSTART", mgwstart, line)
1163                        line = re.sub("EXPSTART", mexpstart, line)
1164                    else:
1165                        line = re.sub("GWSTART", gwstart, line)
1166                        line = re.sub("EXPSTART", expstart, line)
1167                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1168                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1169                    line = re.sub("EID", self.eid, line)
1170                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1171                            (project, self.eid), line)
1172                    if self.fedkit:
1173                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1174                                line)
1175                    print >>self.testbed_file, line
1176                return True
1177
1178    class allbeds:
1179        """
1180        Process the Allbeds section.  Get access to each federant and save the
1181        parameters in tbparams
1182        """
1183        def __init__(self, get_access):
1184            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1185            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1186            self.in_allbeds = False
1187            self.get_access = get_access
1188
1189        def __call__(self, line, user, tbparams, master, export_project,
1190                access_user):
1191            # Testbed access parameters
1192            if not self.in_allbeds:
1193                if self.begin_allbeds.match(line):
1194                    self.in_allbeds = True
1195                    return True
1196                else:
1197                    return False
1198            else:
1199                if self.end_allbeds.match(line):
1200                    self.in_allbeds = False
1201                else:
1202                    nodes = line.split('|')
1203                    tb = nodes.pop(0)
1204                    self.get_access(tb, nodes, user, tbparams, master,
1205                            export_project, access_user)
1206                return True
1207
1208    class gateways:
1209        def __init__(self, eid, master, tmpdir, gw_pubkey,
1210                gw_secretkey, copy_file, fedkit):
1211            self.begin_gateways = \
1212                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1213            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1214            self.current_gateways = None
1215            self.control_gateway = None
1216            self.active_end = { }
1217
1218            self.eid = eid
1219            self.master = master
1220            self.tmpdir = tmpdir
1221            self.gw_pubkey_base = gw_pubkey
1222            self.gw_secretkey_base = gw_secretkey
1223
1224            self.copy_file = copy_file
1225            self.fedkit = fedkit
1226
1227
1228        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1229                active_end, tbparams, dtb, myname, desthost, type):
1230            """
1231            Produce a gateway configuration file from a gateways line.
1232            """
1233
1234            sproject = tbparams[gw].get('project', 'project')
1235            dproject = tbparams[dtb].get('project', 'project')
1236            sdomain = ".%s.%s%s" % (eid, sproject,
1237                    tbparams[gw].get('domain', ".example.com"))
1238            ddomain = ".%s.%s%s" % (eid, dproject,
1239                    tbparams[dtb].get('domain', ".example.com"))
1240            boss = tbparams[master].get('boss', "boss")
1241            fs = tbparams[master].get('fs', "fs")
1242            event_server = "%s%s" % \
1243                    (tbparams[gw].get('eventserver', "event_server"),
1244                            tbparams[gw].get('domain', "example.com"))
1245            remote_event_server = "%s%s" % \
1246                    (tbparams[dtb].get('eventserver', "event_server"),
1247                            tbparams[dtb].get('domain', "example.com"))
1248            seer_control = "%s%s" % \
1249                    (tbparams[gw].get('control', "control"), sdomain)
1250            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1251
1252            if self.fedkit:
1253                remote_script_dir = "/usr/local/federation/bin"
1254                local_script_dir = "/usr/local/federation/bin"
1255            else:
1256                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1257                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1258
1259            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1260            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1261            tunnel_cfg = tbparams[gw].get("tun", "false")
1262
1263            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1264            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1265
1266            # translate to lower case so the `hostname` hack for specifying
1267            # configuration files works.
1268            conf_file = conf_file.lower();
1269            remote_conf_file = remote_conf_file.lower();
1270
1271            if dtb == master:
1272                active = "false"
1273            elif gw == master:
1274                active = "true"
1275            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1276                active = "false"
1277            else:
1278                active_end['%s-%s' % (gw, dtb)] = 1
1279                active = "true"
1280
1281            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1282            print >>gwconfig, "Active: %s" % active
1283            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1284            if tunnel_iface:
1285                print >>gwconfig, "Interface: %s" % tunnel_iface
1286            print >>gwconfig, "BossName: %s" % boss
1287            print >>gwconfig, "FsName: %s" % fs
1288            print >>gwconfig, "EventServerName: %s" % event_server
1289            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1290            print >>gwconfig, "SeerControl: %s" % seer_control
1291            print >>gwconfig, "Type: %s" % type
1292            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1293            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1294                    local_script_dir
1295            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1296            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1297            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1298                    (remote_conf_dir, remote_conf_file)
1299            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1300            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1301            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1302            gwconfig.close()
1303
1304            return active == "true"
1305
1306        def __call__(self, line, allocated, tbparams):
1307            # Process gateways
1308            if not self.current_gateways:
1309                m = self.begin_gateways.match(line)
1310                if m:
1311                    self.current_gateways = m.group(1)
1312                    if allocated.has_key(self.current_gateways):
1313                        # This test should always succeed
1314                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1315                        if not os.path.exists(tb_dir):
1316                            try:
1317                                os.mkdir(tb_dir)
1318                            except IOError:
1319                                raise service_error(service_error.internal,
1320                                        "Cannot create %s" % tb_dir)
1321                    else:
1322                        # XXX
1323                        self.log.error("[gateways]: Ignoring gateways for " + \
1324                                "unknown testbed %s" % self.current_gateways)
1325                        self.current_gateways = None
1326                    return True
1327                else:
1328                    return False
1329            else:
1330                m = self.end_gateways.match(line)
1331                if m :
1332                    if m.group(1) != self.current_gateways:
1333                        raise service_error(service_error.internal,
1334                                "Mismatched gateway markers!?")
1335                    if self.control_gateway:
1336                        try:
1337                            cc = open("%s/%s/client.conf" %
1338                                    (self.tmpdir, self.current_gateways), 'w')
1339                            print >>cc, "ControlGateway: %s" % \
1340                                    self.control_gateway
1341                            if tbparams[self.master].has_key('smbshare'):
1342                                print >>cc, "SMBSHare: %s" % \
1343                                        tbparams[self.master]['smbshare']
1344                            print >>cc, "ProjectUser: %s" % \
1345                                    tbparams[self.master]['user']
1346                            print >>cc, "ProjectName: %s" % \
1347                                    tbparams[self.master]['project']
1348                            cc.close()
1349                        except IOError:
1350                            raise service_error(service_error.internal,
1351                                    "Error creating client config")
1352                        try:
1353                            cc = open("%s/%s/seer.conf" %
1354                                    (self.tmpdir, self.current_gateways),
1355                                    'w')
1356                            if self.current_gateways != self.master:
1357                                print >>cc, "ControlNode: %s" % \
1358                                        self.control_gateway
1359                            print >>cc, "ExperimentID: %s/%s" % \
1360                                    ( tbparams[self.master]['project'], \
1361                                    self.eid )
1362                            cc.close()
1363                        except IOError:
1364                            raise service_error(service_error.internal,
1365                                    "Error creating seer config")
1366                    else:
1367                        debug.error("[gateways]: No control gateway for %s" %\
1368                                    self.current_gateways)
1369                    self.current_gateways = None
1370                else:
1371                    dtb, myname, desthost, type = line.split(" ")
1372
1373                    if type == "control" or type == "both":
1374                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1375                                self.eid, 
1376                                tbparams[self.current_gateways]['project'],
1377                                tbparams[self.current_gateways]['domain'])
1378                    try:
1379                        active = self.gateway_conf_file(self.current_gateways,
1380                                self.master, self.eid, self.gw_pubkey_base,
1381                                self.gw_secretkey_base,
1382                                self.active_end, tbparams, dtb, myname,
1383                                desthost, type)
1384                    except IOError, e:
1385                        raise service_error(service_error.internal,
1386                                "Failed to write config file for %s" % \
1387                                        self.current_gateway)
1388           
1389                    gw_pubkey = "%s/keys/%s" % \
1390                            (self.tmpdir, self.gw_pubkey_base)
1391                    gw_secretkey = "%s/keys/%s" % \
1392                            (self.tmpdir, self.gw_secretkey_base)
1393
1394                    pkfile = "%s/%s/%s" % \
1395                            ( self.tmpdir, self.current_gateways, 
1396                                    self.gw_pubkey_base)
1397                    skfile = "%s/%s/%s" % \
1398                            ( self.tmpdir, self.current_gateways, 
1399                                    self.gw_secretkey_base)
1400
1401                    if not os.path.exists(pkfile):
1402                        try:
1403                            self.copy_file(gw_pubkey, pkfile)
1404                        except IOError:
1405                            service_error(service_error.internal,
1406                                    "Failed to copy pubkey file")
1407
1408                    if active and not os.path.exists(skfile):
1409                        try:
1410                            self.copy_file(gw_secretkey, skfile)
1411                        except IOError:
1412                            service_error(service_error.internal,
1413                                    "Failed to copy secretkey file")
1414                return True
1415
1416    class shunt_to_file:
1417        """
1418        Simple class to write data between two regexps to a file.
1419        """
1420        def __init__(self, begin, end, filename):
1421            """
1422            Begin shunting on a match of begin, stop on end, send data to
1423            filename.
1424            """
1425            self.begin = re.compile(begin)
1426            self.end = re.compile(end)
1427            self.in_shunt = False
1428            self.file = None
1429            self.filename = filename
1430
1431        def __call__(self, line):
1432            """
1433            Call this on each line in the input that may be shunted.
1434            """
1435            if not self.in_shunt:
1436                if self.begin.match(line):
1437                    self.in_shunt = True
1438                    try:
1439                        self.file = open(self.filename, "w")
1440                    except:
1441                        self.file = None
1442                        raise
1443                    return True
1444                else:
1445                    return False
1446            else:
1447                if self.end.match(line):
1448                    if self.file: 
1449                        self.file.close()
1450                        self.file = None
1451                    self.in_shunt = False
1452                else:
1453                    if self.file:
1454                        print >>self.file, line
1455                return True
1456
1457    class shunt_to_list:
1458        """
1459        Same interface as shunt_to_file.  Data collected in self.list, one list
1460        element per line.
1461        """
1462        def __init__(self, begin, end):
1463            self.begin = re.compile(begin)
1464            self.end = re.compile(end)
1465            self.in_shunt = False
1466            self.list = [ ]
1467       
1468        def __call__(self, line):
1469            if not self.in_shunt:
1470                if self.begin.match(line):
1471                    self.in_shunt = True
1472                    return True
1473                else:
1474                    return False
1475            else:
1476                if self.end.match(line):
1477                    self.in_shunt = False
1478                else:
1479                    self.list.append(line)
1480                return True
1481
1482    class shunt_to_string:
1483        """
1484        Same interface as shunt_to_file.  Data collected in self.str, all in
1485        one string.
1486        """
1487        def __init__(self, begin, end):
1488            self.begin = re.compile(begin)
1489            self.end = re.compile(end)
1490            self.in_shunt = False
1491            self.str = ""
1492       
1493        def __call__(self, line):
1494            if not self.in_shunt:
1495                if self.begin.match(line):
1496                    self.in_shunt = True
1497                    return True
1498                else:
1499                    return False
1500            else:
1501                if self.end.match(line):
1502                    self.in_shunt = False
1503                else:
1504                    self.str += line
1505                return True
1506
1507    def create_experiment(self, req, fid):
1508        """
1509        The external interface to experiment creation called from the
1510        dispatcher.
1511
1512        Creates a working directory, splits the incoming description using the
1513        splitter script and parses out the avrious subsections using the
1514        lcasses above.  Once each sub-experiment is created, use pooled threads
1515        to instantiate them and start it all up.
1516        """
1517
1518        if not self.auth.check_attribute(fid, 'create'):
1519            raise service_error(service_error.access, "Create access denied")
1520
1521        try:
1522            tmpdir = tempfile.mkdtemp(prefix="split-")
1523        except IOError:
1524            raise service_error(service_error.internal, "Cannot create tmp dir")
1525
1526        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1527        gw_secretkey_base = "fed.%s" % self.ssh_type
1528        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1529        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1530        tclfile = tmpdir + "/experiment.tcl"
1531        tbparams = { }
1532        try:
1533            access_user = self.accessdb[fid]
1534        except KeyError:
1535            raise service_error(service_error.internal,
1536                    "Access map and authorizer out of sync in " + \
1537                            "create_experiment for fedid %s"  % fid)
1538
1539        pid = "dummy"
1540        gid = "dummy"
1541        # XXX
1542        fail_soft = False
1543
1544        try:
1545            os.mkdir(tmpdir+"/keys")
1546        except OSError:
1547            raise service_error(service_error.internal,
1548                    "Can't make temporary dir")
1549
1550        req = req.get('CreateRequestBody', None)
1551        if not req:
1552            raise service_error(service_error.req,
1553                    "Bad request format (no CreateRequestBody)")
1554        # The tcl parser needs to read a file so put the content into that file
1555        descr=req.get('experimentdescription', None)
1556        if descr:
1557            file_content=descr.get('ns2description', None)
1558            if file_content:
1559                try:
1560                    f = open(tclfile, 'w')
1561                    f.write(file_content)
1562                    f.close()
1563                except IOError:
1564                    raise service_error(service_error.internal,
1565                            "Cannot write temp experiment description")
1566            else:
1567                raise service_error(service_error.req, 
1568                        "Only ns2descriptions supported")
1569        else:
1570            raise service_error(service_error.req, "No experiment description")
1571
1572        if req.has_key('experimentID') and \
1573                req['experimentID'].has_key('localname'):
1574            eid = req['experimentID']['localname']
1575            self.state_lock.acquire()
1576            while (self.state.has_key(eid)):
1577                eid += random.choice(string.ascii_letters)
1578            # To avoid another thread picking this localname
1579            self.state[eid] = "placeholder"
1580            self.state_lock.release()
1581        else:
1582            eid = self.exp_stem
1583            for i in range(0,5):
1584                eid += random.choice(string.ascii_letters)
1585            self.state_lock.acquire()
1586            while (self.state.has_key(eid)):
1587                eid = self.exp_stem
1588                for i in range(0,5):
1589                    eid += random.choice(string.ascii_letters)
1590            # To avoid another thread picking this localname
1591            self.state[eid] = "placeholder"
1592            self.state_lock.release()
1593
1594        try: 
1595            # This catches exceptions to clear the placeholder if necessary
1596            try:
1597                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1598            except ValueError:
1599                raise service_error(service_error.server_config, 
1600                        "Bad key type (%s)" % self.ssh_type)
1601
1602            user = req.get('user', None)
1603            if user == None:
1604                raise service_error(service_error.req, "No user")
1605
1606            master = req.get('master', None)
1607            if not master:
1608                raise service_error(service_error.req,
1609                        "No master testbed label")
1610            export_project = req.get('exportProject', None)
1611            if not export_project:
1612                raise service_error(service_error.req, "No export project")
1613           
1614            if self.splitter_url:
1615                self.log.debug("Calling remote splitter at %s" % \
1616                        self.splitter_url)
1617                split_data = self.remote_splitter(self.splitter_url,
1618                        file_content, master)
1619            else:
1620                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1621                    str(self.muxmax), '-m', master]
1622
1623                if self.fedkit:
1624                    tclcmd.append('-k')
1625
1626                tclcmd.extend([pid, gid, eid, tclfile])
1627
1628                self.log.debug("running local splitter %s", " ".join(tclcmd))
1629                tclparser = Popen(tclcmd, stdout=PIPE)
1630                split_data = tclparser.stdout
1631
1632            allocated = { }         # Testbeds we can access
1633            started = { }           # Testbeds where a sub-experiment started
1634                                # successfully
1635
1636            # Objects to parse the splitter output (defined above)
1637            parse_current_testbed = self.current_testbed(eid, tmpdir,
1638                    self.fedkit)
1639            parse_allbeds = self.allbeds(self.get_access)
1640            parse_gateways = self.gateways(eid, master, tmpdir,
1641                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1642                    self.fedkit)
1643            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1644                        "^#\s+End\s+Vtopo")
1645            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1646                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1647            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1648                    "^#\s+End\s+tarfiles")
1649            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1650                    "^#\s+End\s+rpms")
1651
1652            # Working on the split data
1653            for line in split_data:
1654                line = line.rstrip()
1655                if parse_current_testbed(line, master, allocated, tbparams):
1656                    continue
1657                elif parse_allbeds(line, user, tbparams, master, export_project,
1658                        access_user):
1659                    continue
1660                elif parse_gateways(line, allocated, tbparams):
1661                    continue
1662                elif parse_vtopo(line):
1663                    continue
1664                elif parse_hostnames(line):
1665                    continue
1666                elif parse_tarfiles(line):
1667                    continue
1668                elif parse_rpms(line):
1669                    continue
1670                else:
1671                    raise service_error(service_error.internal, 
1672                            "Bad tcl parse? %s" % line)
1673            # Virtual topology and visualization
1674            vtopo = self.gentopo(parse_vtopo.str)
1675            if not vtopo:
1676                raise service_error(service_error.internal, 
1677                        "Failed to generate virtual topology")
1678
1679            vis = self.genviz(vtopo)
1680            if not vis:
1681                raise service_error(service_error.internal, 
1682                        "Failed to generate visualization")
1683           
1684            # save federant information
1685            for k in allocated.keys():
1686                tbparams[k]['federant'] = {\
1687                        'name': [ { 'localname' : eid} ],\
1688                        'emulab': tbparams[k]['emulab'],\
1689                        'allocID' : tbparams[k]['allocID'],\
1690                        'master' : k == master,\
1691                    }
1692
1693
1694            # Copy tarfiles and rpms needed at remote sites into a staging area
1695            try:
1696                if self.fedkit:
1697                    parse_tarfiles.list.append(self.fedkit)
1698                for t in parse_tarfiles.list:
1699                    if not os.path.exists("%s/tarfiles" % tmpdir):
1700                        os.mkdir("%s/tarfiles" % tmpdir)
1701                    self.copy_file(t, "%s/tarfiles/%s" % \
1702                            (tmpdir, os.path.basename(t)))
1703                for r in parse_rpms.list:
1704                    if not os.path.exists("%s/rpms" % tmpdir):
1705                        os.mkdir("%s/rpms" % tmpdir)
1706                    self.copy_file(r, "%s/rpms/%s" % \
1707                            (tmpdir, os.path.basename(r)))
1708            except IOError, e:
1709                raise service_error(service_error.internal, 
1710                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1711
1712        except service_error, e:
1713            # If something goes wrong in the parse (usually an access error)
1714            # clear the placeholder state.  From here on out the code delays
1715            # exceptions.
1716            self.state_lock.acquire()
1717            del self.state[eid]
1718            self.state_lock.release()
1719            raise e
1720
1721        thread_pool = self.thread_pool(self.nthreads)
1722        threads = [ ]
1723
1724        for tb in [ k for k in allocated.keys() if k != master]:
1725            # Create and start a thread to start the segment, and save it to
1726            # get the return value later
1727            thread_pool.wait_for_slot()
1728            t  = self.pooled_thread(target=self.start_segment, 
1729                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1730                    pdata=thread_pool, trace_file=self.trace_file)
1731            threads.append(t)
1732            t.start()
1733
1734        # Wait until all finish
1735        thread_pool.wait_for_all_done()
1736
1737        # If none failed, start the master
1738        failed = [ t.getName() for t in threads if not t.rv ]
1739
1740        if len(failed) == 0:
1741            if not self.start_segment(master, eid, tbparams, tmpdir):
1742                failed.append(master)
1743
1744        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1745        # If one failed clean up, unless fail_soft is set
1746        if failed:
1747            if not fail_soft:
1748                thread_pool.clear()
1749                for tb in succeeded:
1750                    # Create and start a thread to stop the segment
1751                    thread_pool.wait_for_slot()
1752                    t  = self.pooled_thread(target=self.stop_segment, 
1753                            args=(tb, eid, tbparams), name=tb,
1754                            pdata=thread_pool, trace_file=self.trace_file)
1755                    t.start()
1756                # Wait until all finish
1757                thread_pool.wait_for_all_done()
1758
1759                # release the allocations
1760                for tb in tbparams.keys():
1761                    self.release_access(tb, tbparams[tb]['allocID'])
1762                # Remove the placeholder
1763                self.state_lock.acquire()
1764                del self.state[eid]
1765                self.state_lock.release()
1766
1767                raise service_error(service_error.federant,
1768                    "Swap in failed on %s" % ",".join(failed))
1769        else:
1770            self.log.info("[start_segment]: Experiment %s started" % eid)
1771
1772        # Generate an ID for the experiment (slice) and a certificate that the
1773        # allocator can use to prove they own it.  We'll ship it back through
1774        # the encrypted connection.
1775        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1776
1777        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1778
1779        # Walk up tmpdir, deleting as we go
1780        for path, dirs, files in os.walk(tmpdir, topdown=False):
1781            for f in files:
1782                os.remove(os.path.join(path, f))
1783            for d in dirs:
1784                os.rmdir(os.path.join(path, d))
1785        os.rmdir(tmpdir)
1786
1787        # The deepcopy prevents the allocation ID and other binaries from being
1788        # translated into other formats
1789        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1790                for tb in tbparams.keys() \
1791                    if tbparams[tb].has_key('federant') ],\
1792                    'vtopo': vtopo,\
1793                    'vis' : vis,
1794                    'experimentID' : [\
1795                            { 'fedid': copy.copy(expid) }, \
1796                            { 'localname': eid },\
1797                        ],\
1798                    'experimentAccess': { 'X509' : expcert },\
1799                }
1800        # remove the allocationID info from each federant
1801        for f in resp['federant']:
1802            if f.has_key('allocID'): del f['allocID']
1803
1804        # Insert the experiment into our state and update the disk copy
1805        self.state_lock.acquire()
1806        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1807                for tb in tbparams.keys() \
1808                    if tbparams[tb].has_key('federant') ],\
1809                    'vtopo': vtopo,\
1810                    'vis' : vis,
1811                    'owner': fid,
1812                    'experimentID' : [\
1813                            { 'fedid': expid }, { 'localname': eid },\
1814                        ],\
1815                }
1816        self.state[eid] = self.state[expid]
1817        if self.state_filename: self.write_state()
1818        self.state_lock.release()
1819
1820        self.auth.set_attribute(fid, expid)
1821        self.auth.set_attribute(expid, expid)
1822
1823        if not failed:
1824            return resp
1825        else:
1826            raise service_error(service_error.partial, \
1827                    "Partial swap in on %s" % ",".join(succeeded))
1828
1829    def check_experiment_access(self, fid, key):
1830        """
1831        Confirm that the fid has access to the experiment.  Though a request
1832        may be made in terms of a local name, the access attribute is always
1833        the experiment's fedid.
1834        """
1835        if not isinstance(key, fedid):
1836            self.state_lock.acquire()
1837            if self.state.has_key(key):
1838                try:
1839                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1840                            if f.has_key('fedid') ]
1841                except KeyError:
1842                    self.state_lock.release()
1843                    raise service_error(service_error.internal, 
1844                            "No fedid for experiment %s when checking " +\
1845                                    "access(!?)" % key)
1846                if len(kl) == 1:
1847                    key = kl[0]
1848                else:
1849                    self.state_lock.release()
1850                    raise service_error(service_error.internal, 
1851                            "multiple fedids for experiment %s when " +\
1852                                    "checking access(!?)" % key)
1853            else:
1854                self.state_lock.release()
1855                raise service_error(service_error.access, "Access Denied")
1856            self.state_lock.release()
1857
1858        if self.auth.check_attribute(fid, key):
1859            return True
1860        else:
1861            raise service_error(service_error.access, "Access Denied")
1862
1863
1864
1865    def get_vtopo(self, req, fid):
1866        """
1867        Return the stored virtual topology for this experiment
1868        """
1869        rv = None
1870
1871        req = req.get('VtopoRequestBody', None)
1872        if not req:
1873            raise service_error(service_error.req,
1874                    "Bad request format (no VtopoRequestBody)")
1875        exp = req.get('experiment', None)
1876        if exp:
1877            if exp.has_key('fedid'):
1878                key = exp['fedid']
1879                keytype = "fedid"
1880            elif exp.has_key('localname'):
1881                key = exp['localname']
1882                keytype = "localname"
1883            else:
1884                raise service_error(service_error.req, "Unknown lookup type")
1885        else:
1886            raise service_error(service_error.req, "No request?")
1887
1888        self.check_experiment_access(fid, key)
1889
1890        self.state_lock.acquire()
1891        if self.state.has_key(key):
1892            rv = { 'experiment' : {keytype: key },\
1893                    'vtopo': self.state[key]['vtopo'],\
1894                }
1895        self.state_lock.release()
1896
1897        if rv: return rv
1898        else: raise service_error(service_error.req, "No such experiment")
1899
1900    def get_vis(self, req, fid):
1901        """
1902        Return the stored visualization for this experiment
1903        """
1904        rv = None
1905
1906        req = req.get('VisRequestBody', None)
1907        if not req:
1908            raise service_error(service_error.req,
1909                    "Bad request format (no VisRequestBody)")
1910        exp = req.get('experiment', None)
1911        if exp:
1912            if exp.has_key('fedid'):
1913                key = exp['fedid']
1914                keytype = "fedid"
1915            elif exp.has_key('localname'):
1916                key = exp['localname']
1917                keytype = "localname"
1918            else:
1919                raise service_error(service_error.req, "Unknown lookup type")
1920        else:
1921            raise service_error(service_error.req, "No request?")
1922
1923        self.check_experiment_access(fid, key)
1924
1925        self.state_lock.acquire()
1926        if self.state.has_key(key):
1927            rv =  { 'experiment' : {keytype: key },\
1928                    'vis': self.state[key]['vis'],\
1929                    }
1930        self.state_lock.release()
1931
1932        if rv: return rv
1933        else: raise service_error(service_error.req, "No such experiment")
1934
1935    def get_info(self, req, fid):
1936        """
1937        Return all the stored info about this experiment
1938        """
1939        rv = None
1940
1941        req = req.get('InfoRequestBody', None)
1942        if not req:
1943            raise service_error(service_error.req,
1944                    "Bad request format (no VisRequestBody)")
1945        exp = req.get('experiment', None)
1946        if exp:
1947            if exp.has_key('fedid'):
1948                key = exp['fedid']
1949                keytype = "fedid"
1950            elif exp.has_key('localname'):
1951                key = exp['localname']
1952                keytype = "localname"
1953            else:
1954                raise service_error(service_error.req, "Unknown lookup type")
1955        else:
1956            raise service_error(service_error.req, "No request?")
1957
1958        self.check_experiment_access(fid, key)
1959
1960        # The state may be massaged by the service function that called
1961        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1962        # state.
1963        self.state_lock.acquire()
1964        if self.state.has_key(key):
1965            rv = copy.deepcopy(self.state[key])
1966        self.state_lock.release()
1967        # Remove the owner info
1968        del rv['owner']
1969        # remove the allocationID info from each federant
1970        for f in rv['federant']:
1971            if f.has_key('allocID'): del f['allocID']
1972
1973        if rv: return rv
1974        else: raise service_error(service_error.req, "No such experiment")
1975
1976
1977    def terminate_experiment(self, req, fid):
1978        """
1979        Swap this experiment out on the federants and delete the shared
1980        information
1981        """
1982        tbparams = { }
1983        req = req.get('TerminateRequestBody', None)
1984        if not req:
1985            raise service_error(service_error.req,
1986                    "Bad request format (no TerminateRequestBody)")
1987        exp = req.get('experiment', None)
1988        if exp:
1989            if exp.has_key('fedid'):
1990                key = exp['fedid']
1991                keytype = "fedid"
1992            elif exp.has_key('localname'):
1993                key = exp['localname']
1994                keytype = "localname"
1995            else:
1996                raise service_error(service_error.req, "Unknown lookup type")
1997        else:
1998            raise service_error(service_error.req, "No request?")
1999
2000        self.check_experiment_access(fid, key)
2001
2002        self.state_lock.acquire()
2003        fed_exp = self.state.get(key, None)
2004
2005        if fed_exp:
2006            # This branch of the conditional holds the lock to generate a
2007            # consistent temporary tbparams variable to deallocate experiments.
2008            # It releases the lock to do the deallocations and reacquires it to
2009            # remove the experiment state when the termination is complete.
2010            ids = []
2011            #  experimentID is a list of dicts that are self-describing
2012            #  identifiers.  This finds all the fedids and localnames - the
2013            #  keys of self.state - and puts them into ids.
2014            for id in fed_exp.get('experimentID', []):
2015                if id.has_key('fedid'): ids.append(id['fedid'])
2016                if id.has_key('localname'): ids.append(id['localname'])
2017
2018            # Construct enough of the tbparams to make the stop_segment calls
2019            # work
2020            for fed in fed_exp['federant']:
2021                try:
2022                    for e in fed['name']:
2023                        eid = e.get('localname', None)
2024                        if eid: break
2025                    else:
2026                        continue
2027
2028                    p = fed['emulab']['project']
2029
2030                    project = p['name']['localname']
2031                    tb = p['testbed']['localname']
2032                    user = p['user'][0]['userID']['localname']
2033
2034                    domain = fed['emulab']['domain']
2035                    host  = fed['emulab']['ops']
2036                    aid = fed['allocID']
2037                except KeyError, e:
2038                    continue
2039                tbparams[tb] = {\
2040                        'user': user,\
2041                        'domain': domain,\
2042                        'project': project,\
2043                        'host': host,\
2044                        'eid': eid,\
2045                        'aid': aid,\
2046                    }
2047            self.state_lock.release()
2048
2049            # Stop everyone.
2050            thread_pool = self.thread_pool(self.nthreads)
2051            for tb in tbparams.keys():
2052                # Create and start a thread to stop the segment
2053                thread_pool.wait_for_slot()
2054                t  = self.pooled_thread(target=self.stop_segment, 
2055                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2056                        pdata=thread_pool, trace_file=self.trace_file)
2057                t.start()
2058            # Wait for completions
2059            thread_pool.wait_for_all_done()
2060
2061            # release the allocations
2062            for tb in tbparams.keys():
2063                self.release_access(tb, tbparams[tb]['aid'])
2064
2065            # Remove the terminated experiment
2066            self.state_lock.acquire()
2067            for id in ids:
2068                if self.state.has_key(id): del self.state[id]
2069
2070            if self.state_filename: self.write_state()
2071            self.state_lock.release()
2072
2073            return { 'experiment': exp }
2074        else:
2075            # Don't forget to release the lock
2076            self.state_lock.release()
2077            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.