source: fedd/federation/experiment_control.py @ 2b7d768

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 2b7d768 was 5ae3857, checked in by Ted Faber <faber@…>, 15 years ago

terminate works

  • Property mode set to 100644
File size: 130.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32from ip_allocator import ip_allocator
33from ip_addr import ip_addr
34
35
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.experiment_control")
40fl.addHandler(nullHandler())
41
42class experiment_control_local:
43    """
44    Control of experiments that this system can directly access.
45
46    Includes experiment creation, termination and information dissemination.
47    Thred safe.
48    """
49
50    class ssh_cmd_timeout(RuntimeError): pass
51
52    class list_log:
53        """
54        Provide an interface that lets logger.StreamHandler s write to a list
55        of strings.
56        """
57        def __init__(self, l=[]):
58            """
59            Link to an existing list or just create a log
60            """
61            self.ll = l
62            self.lock = Lock()
63        def write(self, str):
64            """
65            Add the string to the log.  Lock for consistency.
66            """
67            self.lock.acquire()
68            self.ll.append(str)
69            self.lock.release()
70
71        def flush(self):
72            """
73            No-op that StreamHandlers expect
74            """
75            pass
76
77   
78    class thread_pool:
79        """
80        A class to keep track of a set of threads all invoked for the same
81        task.  Manages the mutual exclusion of the states.
82        """
83        def __init__(self, nthreads):
84            """
85            Start a pool.
86            """
87            self.changed = Condition()
88            self.started = 0
89            self.terminated = 0
90            self.nthreads = nthreads
91
92        def acquire(self):
93            """
94            Get the pool's lock.
95            """
96            self.changed.acquire()
97
98        def release(self):
99            """
100            Release the pool's lock.
101            """
102            self.changed.release()
103
104        def wait(self, timeout = None):
105            """
106            Wait for a pool thread to start or stop.
107            """
108            self.changed.wait(timeout)
109
110        def start(self):
111            """
112            Called by a pool thread to report starting.
113            """
114            self.changed.acquire()
115            self.started += 1
116            self.changed.notifyAll()
117            self.changed.release()
118
119        def terminate(self):
120            """
121            Called by a pool thread to report finishing.
122            """
123            self.changed.acquire()
124            self.terminated += 1
125            self.changed.notifyAll()
126            self.changed.release()
127
128        def clear(self):
129            """
130            Clear all pool data.
131            """
132            self.changed.acquire()
133            self.started = 0
134            self.terminated =0
135            self.changed.notifyAll()
136            self.changed.release()
137
138        def wait_for_slot(self):
139            """
140            Wait until we have a free slot to start another pooled thread
141            """
142            self.acquire()
143            while self.started - self.terminated >= self.nthreads:
144                self.wait()
145            self.release()
146
147        def wait_for_all_done(self):
148            """
149            Wait until all active threads finish (and at least one has started)
150            """
151            self.acquire()
152            while self.started == 0 or self.started > self.terminated:
153                self.wait()
154            self.release()
155
156    class pooled_thread(Thread):
157        """
158        One of a set of threads dedicated to a specific task.  Uses the
159        thread_pool class above for coordination.
160        """
161        def __init__(self, group=None, target=None, name=None, args=(), 
162                kwargs={}, pdata=None, trace_file=None):
163            Thread.__init__(self, group, target, name, args, kwargs)
164            self.rv = None          # Return value of the ops in this thread
165            self.exception = None   # Exception that terminated this thread
166            self.target=target      # Target function to run on start()
167            self.args = args        # Args to pass to target
168            self.kwargs = kwargs    # Additional kw args
169            self.pdata = pdata      # thread_pool for this class
170            # Logger for this thread
171            self.log = logging.getLogger("fedd.experiment_control")
172       
173        def run(self):
174            """
175            Emulate Thread.run, except add pool data manipulation and error
176            logging.
177            """
178            if self.pdata:
179                self.pdata.start()
180
181            if self.target:
182                try:
183                    self.rv = self.target(*self.args, **self.kwargs)
184                except service_error, s:
185                    self.exception = s
186                    self.log.error("Thread exception: %s %s" % \
187                            (s.code_string(), s.desc))
188                except:
189                    self.exception = sys.exc_info()[1]
190                    self.log.error(("Unexpected thread exception: %s" +\
191                            "Trace %s") % (self.exception,\
192                                traceback.format_exc()))
193            if self.pdata:
194                self.pdata.terminate()
195
196    call_RequestAccess = service_caller('RequestAccess')
197    call_ReleaseAccess = service_caller('ReleaseAccess')
198    call_StartSegment = service_caller('StartSegment')
199    call_TerminateSegment = service_caller('TerminateSegment')
200    call_Ns2Split = service_caller('Ns2Split')
201
202    def __init__(self, config=None, auth=None):
203        """
204        Intialize the various attributes, most from the config object
205        """
206
207        def parse_tarfile_list(tf):
208            """
209            Parse a tarfile list from the configuration.  This is a set of
210            paths and tarfiles separated by spaces.
211            """
212            rv = [ ]
213            if tf is not None:
214                tl = tf.split()
215                while len(tl) > 1:
216                    p, t = tl[0:2]
217                    del tl[0:2]
218                    rv.append((p, t))
219            return rv
220
221        self.thread_with_rv = experiment_control_local.pooled_thread
222        self.thread_pool = experiment_control_local.thread_pool
223        self.list_log = experiment_control_local.list_log
224
225        self.cert_file = config.get("experiment_control", "cert_file")
226        if self.cert_file:
227            self.cert_pwd = config.get("experiment_control", "cert_pwd")
228        else:
229            self.cert_file = config.get("globals", "cert_file")
230            self.cert_pwd = config.get("globals", "cert_pwd")
231
232        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
233                or config.get("globals", "trusted_certs")
234
235        self.repodir = config.get("experiment_control", "repodir")
236
237        self.exp_stem = "fed-stem"
238        self.log = logging.getLogger("fedd.experiment_control")
239        set_log_level(config, "experiment_control", self.log)
240        self.muxmax = 2
241        self.nthreads = 2
242        self.randomize_experiments = False
243
244        self.splitter = None
245        self.ssh_keygen = "/usr/bin/ssh-keygen"
246        self.ssh_identity_file = None
247
248
249        self.debug = config.getboolean("experiment_control", "create_debug")
250        self.state_filename = config.get("experiment_control", 
251                "experiment_state")
252        self.splitter_url = config.get("experiment_control", "splitter_uri")
253        self.fedkit = parse_tarfile_list(\
254                config.get("experiment_control", "fedkit"))
255        self.gatewaykit = parse_tarfile_list(\
256                config.get("experiment_control", "gatewaykit"))
257        accessdb_file = config.get("experiment_control", "accessdb")
258
259        self.ssh_pubkey_file = config.get("experiment_control", 
260                "ssh_pubkey_file")
261        self.ssh_privkey_file = config.get("experiment_control",
262                "ssh_privkey_file")
263        # NB for internal master/slave ops, not experiment setup
264        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
265
266        self.overrides = set([])
267        ovr = config.get('experiment_control', 'overrides')
268        if ovr:
269            for o in ovr.split(","):
270                o = o.strip()
271                if o.startswith('fedid:'): o = o[len('fedid:'):]
272                self.overrides.add(fedid(hexstr=o))
273
274        self.state = { }
275        self.state_lock = Lock()
276        self.tclsh = "/usr/local/bin/otclsh"
277        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
278                config.get("experiment_control", "tcl_splitter",
279                        "/usr/testbed/lib/ns2ir/parse.tcl")
280        mapdb_file = config.get("experiment_control", "mapdb")
281        self.trace_file = sys.stderr
282
283        self.def_expstart = \
284                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
285                "/tmp/federate";
286        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
287                "FEDDIR/hosts";
288        self.def_gwstart = \
289                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
290                "/tmp/bridge.log";
291        self.def_mgwstart = \
292                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
293                "/tmp/bridge.log";
294        self.def_gwimage = "FBSD61-TUNNEL2";
295        self.def_gwtype = "pc";
296        self.local_access = { }
297
298        if auth:
299            self.auth = auth
300        else:
301            self.log.error(\
302                    "[access]: No authorizer initialized, creating local one.")
303            auth = authorizer()
304
305
306        if self.ssh_pubkey_file:
307            try:
308                f = open(self.ssh_pubkey_file, 'r')
309                self.ssh_pubkey = f.read()
310                f.close()
311            except IOError:
312                raise service_error(service_error.internal,
313                        "Cannot read sshpubkey")
314        else:
315            raise service_error(service_error.internal, 
316                    "No SSH public key file?")
317
318        if not self.ssh_privkey_file:
319            raise service_error(service_error.internal, 
320                    "No SSH public key file?")
321
322
323        if mapdb_file:
324            self.read_mapdb(mapdb_file)
325        else:
326            self.log.warn("[experiment_control] No testbed map, using defaults")
327            self.tbmap = { 
328                    'deter':'https://users.isi.deterlab.net:23235',
329                    'emulab':'https://users.isi.deterlab.net:23236',
330                    'ucb':'https://users.isi.deterlab.net:23237',
331                    }
332
333        if accessdb_file:
334                self.read_accessdb(accessdb_file)
335        else:
336            raise service_error(service_error.internal,
337                    "No accessdb specified in config")
338
339        # Grab saved state.  OK to do this w/o locking because it's read only
340        # and only one thread should be in existence that can see self.state at
341        # this point.
342        if self.state_filename:
343            self.read_state()
344
345        # Dispatch tables
346        self.soap_services = {\
347                'Create': soap_handler('Create', self.new_create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.new_terminate_experiment),
354        }
355
356        self.xmlrpc_services = {\
357                'Create': xmlrpc_handler('Create', self.new_create_experiment),
358                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
359                'Vis': xmlrpc_handler('Vis', self.get_vis),
360                'Info': xmlrpc_handler('Info', self.get_info),
361                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
362                'Terminate': xmlrpc_handler('Terminate',
363                    self.new_terminate_experiment),
364        }
365
366    def copy_file(self, src, dest, size=1024):
367        """
368        Exceedingly simple file copy.
369        """
370        s = open(src,'r')
371        d = open(dest, 'w')
372
373        buf = "x"
374        while buf != "":
375            buf = s.read(size)
376            d.write(buf)
377        s.close()
378        d.close()
379
380    # Call while holding self.state_lock
381    def write_state(self):
382        """
383        Write a new copy of experiment state after copying the existing state
384        to a backup.
385
386        State format is a simple pickling of the state dictionary.
387        """
388        if os.access(self.state_filename, os.W_OK):
389            self.copy_file(self.state_filename, \
390                    "%s.bak" % self.state_filename)
391        try:
392            f = open(self.state_filename, 'w')
393            pickle.dump(self.state, f)
394        except IOError, e:
395            self.log.error("Can't write file %s: %s" % \
396                    (self.state_filename, e))
397        except pickle.PicklingError, e:
398            self.log.error("Pickling problem: %s" % e)
399        except TypeError, e:
400            self.log.error("Pickling problem (TypeError): %s" % e)
401
402    # Call while holding self.state_lock
403    def read_state(self):
404        """
405        Read a new copy of experiment state.  Old state is overwritten.
406
407        State format is a simple pickling of the state dictionary.
408        """
409       
410        def get_experiment_id(state):
411            """
412            Pull the fedid experimentID out of the saved state.  This is kind
413            of a gross walk through the dict.
414            """
415
416            if state.has_key('experimentID'):
417                for e in state['experimentID']:
418                    if e.has_key('fedid'):
419                        return e['fedid']
420                else:
421                    return None
422            else:
423                return None
424
425        def get_alloc_ids(state):
426            """
427            Pull the fedids of the identifiers of each allocation from the
428            state.  Again, a dict dive that's best isolated.
429            """
430
431            return [ f['allocID']['fedid'] 
432                    for f in state.get('federant',[]) \
433                        if f.has_key('allocID') and \
434                            f['allocID'].has_key('fedid')]
435
436
437        try:
438            f = open(self.state_filename, "r")
439            self.state = pickle.load(f)
440            self.log.debug("[read_state]: Read state from %s" % \
441                    self.state_filename)
442        except IOError, e:
443            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
444                    % (self.state_filename, e))
445        except pickle.UnpicklingError, e:
446            self.log.warning(("[read_state]: No saved state: " + \
447                    "Unpickling failed: %s") % e)
448       
449        for s in self.state.values():
450            try:
451
452                eid = get_experiment_id(s)
453                if eid : 
454                    # Give the owner rights to the experiment
455                    self.auth.set_attribute(s['owner'], eid)
456                    # And holders of the eid as well
457                    self.auth.set_attribute(eid, eid)
458                    # allow overrides to control experiments as well
459                    for o in self.overrides:
460                        self.auth.set_attribute(o, eid)
461                    # Set permissions to allow reading of the software repo, if
462                    # any, as well.
463                    for a in get_alloc_ids(s):
464                        self.auth.set_attribute(a, 'repo/%s' % eid)
465                else:
466                    raise KeyError("No experiment id")
467            except KeyError, e:
468                self.log.warning("[read_state]: State ownership or identity " +\
469                        "misformatted in %s: %s" % (self.state_filename, e))
470
471
472    def read_accessdb(self, accessdb_file):
473        """
474        Read the mapping from fedids that can create experiments to their name
475        in the 3-level access namespace.  All will be asserted from this
476        testbed and can include the local username and porject that will be
477        asserted on their behalf by this fedd.  Each fedid is also added to the
478        authorization system with the "create" attribute.
479        """
480        self.accessdb = {}
481        # These are the regexps for parsing the db
482        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
483        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
484                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
485        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
486                "\s*->\s*(" + name_expr + ")\s*$")
487        lineno = 0
488
489        # Parse the mappings and store in self.authdb, a dict of
490        # fedid -> (proj, user)
491        try:
492            f = open(accessdb_file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if len(line) == 0 or line.startswith('#'):
497                    continue
498                m = project_line.match(line)
499                if m:
500                    fid = fedid(hexstr=m.group(1))
501                    project, user = m.group(2,3)
502                    if not self.accessdb.has_key(fid):
503                        self.accessdb[fid] = []
504                    self.accessdb[fid].append((project, user))
505                    continue
506
507                m = user_line.match(line)
508                if m:
509                    fid = fedid(hexstr=m.group(1))
510                    project = None
511                    user = m.group(2)
512                    if not self.accessdb.has_key(fid):
513                        self.accessdb[fid] = []
514                    self.accessdb[fid].append((project, user))
515                    continue
516                self.log.warn("[experiment_control] Error parsing access " +\
517                        "db %s at line %d" %  (accessdb_file, lineno))
518        except IOError:
519            raise service_error(service_error.internal,
520                    "Error opening/reading %s as experiment " +\
521                            "control accessdb" %  accessdb_file)
522        f.close()
523
524        # Initialize the authorization attributes
525        for fid in self.accessdb.keys():
526            self.auth.set_attribute(fid, 'create')
527
528    def read_mapdb(self, file):
529        """
530        Read a simple colon separated list of mappings for the
531        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
532        """
533
534        self.tbmap = { }
535        lineno =0
536        try:
537            f = open(file, "r")
538            for line in f:
539                lineno += 1
540                line = line.strip()
541                if line.startswith('#') or len(line) == 0:
542                    continue
543                try:
544                    label, url = line.split(':', 1)
545                    self.tbmap[label] = url
546                except ValueError, e:
547                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
548                            "map db: %s %s" % (lineno, line, e))
549        except IOError, e:
550            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
551                    "open %s: %s" % (file, e))
552        f.close()
553
554    class emulab_segment:
555        def __init__(self, log=None, keyfile=None, debug=False):
556            self.log = log or logging.getLogger(\
557                    'fedd.experiment_control.emulab_segment')
558            self.ssh_privkey_file = keyfile
559            self.debug = debug
560            self.ssh_exec="/usr/bin/ssh"
561            self.scp_exec = "/usr/bin/scp"
562            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
563
564        def scp_file(self, file, user, host, dest=""):
565            """
566            scp a file to the remote host.  If debug is set the action is only
567            logged.
568            """
569
570            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
571                    '-o', 'StrictHostKeyChecking yes', '-i', 
572                    self.ssh_privkey_file, file, 
573                    "%s@%s:%s" % (user, host, dest)]
574            rv = 0
575
576            try:
577                dnull = open("/dev/null", "w")
578            except IOError:
579                self.log.debug("[ssh_file]: failed to open " + \
580                        "/dev/null for redirect")
581                dnull = Null
582
583            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
584            if not self.debug:
585                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
586                        close_fds=True)
587
588            return rv == 0
589
590        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
591            """
592            Run a remote command on host as user.  If debug is set, the action
593            is only logged.  Commands are run without stdin, to avoid stray
594            SIGTTINs.
595            """
596            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
597                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
598                    (self.ssh_exec, self.ssh_privkey_file, 
599                            user, host, cmd)
600
601            try:
602                dnull = open("/dev/null", "w")
603            except IOError:
604                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
605                        "for redirect")
606                dnull = Null
607
608            self.log.debug("[ssh_cmd]: %s" % sh_str)
609            if not self.debug:
610                if dnull:
611                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
612                            close_fds=True)
613                else:
614                    sub = Popen(sh_str, shell=True,
615                            close_fds=True)
616                if timeout:
617                    i = 0
618                    rv = sub.poll()
619                    while i < timeout:
620                        if rv is not None: break
621                        else:
622                            time.sleep(1)
623                            rv = sub.poll()
624                            i += 1
625                    else:
626                        self.log.debug("Process exceeded runtime: %s" % sh_str)
627                        os.kill(sub.pid, signal.SIGKILL)
628                        raise self.ssh_cmd_timeout();
629                    return rv == 0
630                else:
631                    return sub.wait() == 0
632            else:
633                if timeout == 0:
634                    self.log.debug("debug timeout raised on %s " % sh_str)
635                    raise self.ssh_cmd_timeout()
636                else:
637                    return True
638
639    class start_segment(emulab_segment):
640        def __init__(self, log=None, keyfile=None, debug=False):
641            experiment_control_local.emulab_segment.__init__(self,
642                    log=log, keyfile=keyfile, debug=debug)
643
644        def create_config_tree(self, src_dir, dest_dir, script):
645            """
646            Append commands to script that will create the directory hierarchy
647            on the remote federant.
648            """
649
650            if os.path.isdir(src_dir):
651                print >>script, "mkdir -p %s" % dest_dir
652                print >>script, "chmod 770 %s" % dest_dir
653
654                for f in os.listdir(src_dir):
655                    if os.path.isdir(f):
656                        self.create_config_tree("%s/%s" % (src_dir, f), 
657                                "%s/%s" % (dest_dir, f), script)
658            else:
659                self.log.debug("[create_config_tree]: Not a directory: %s" \
660                        % src_dir)
661
662        def ship_configs(self, host, user, src_dir, dest_dir):
663            """
664            Copy federant-specific configuration files to the federant.
665            """
666            for f in os.listdir(src_dir):
667                if os.path.isdir(f):
668                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
669                            "%s/%s" % (dest_dir, f)):
670                        return False
671                else:
672                    if not self.scp_file("%s/%s" % (src_dir, f), 
673                            user, host, dest_dir):
674                        return False
675            return True
676
677        def get_state(self, user, host, tb, pid, eid):
678            # command to test experiment state
679            expinfo_exec = "/usr/testbed/bin/expinfo" 
680            # Regular expressions to parse the expinfo response
681            state_re = re.compile("State:\s+(\w+)")
682            no_exp_re = re.compile("^No\s+such\s+experiment")
683            swapping_re = re.compile("^No\s+information\s+available.")
684            state = None    # Experiment state parsed from expinfo
685            # The expinfo ssh command.  Note the identity restriction to use
686            # only the identity provided in the pubkey given.
687            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
688                    'StrictHostKeyChecking yes', '-i', 
689                    self.ssh_privkey_file, "%s@%s" % (user, host), 
690                    expinfo_exec, pid, eid]
691
692            dev_null = None
693            try:
694                dev_null = open("/dev/null", "a")
695            except IOError, e:
696                self.log.error("[get_state]: can't open /dev/null: %s" %e)
697
698            if self.debug:
699                state = 'swapped'
700                rv = 0
701            else:
702                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
703                        close_fds=True)
704                for line in status.stdout:
705                    m = state_re.match(line)
706                    if m: state = m.group(1)
707                    else:
708                        for reg, st in ((no_exp_re, "none"),
709                                (swapping_re, "swapping")):
710                            m = reg.match(line)
711                            if m: state = st
712                rv = status.wait()
713
714            # If the experiment is not present the subcommand returns a
715            # non-zero return value.  If we successfully parsed a "none"
716            # outcome, ignore the return code.
717            if rv != 0 and state != 'none':
718                raise service_error(service_error.internal,
719                        "Cannot get status of segment %s:%s/%s" % \
720                                (tb, pid, eid))
721            elif state not in ('active', 'swapped', 'swapping', 'none'):
722                raise service_error(service_error.internal,
723                        "Cannot get status of segment %s:%s/%s" % \
724                                (tb, pid, eid))
725            else: return state
726
727
728        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
729            """
730            Start a sub-experiment on a federant.
731
732            Get the current state, modify or create as appropriate, ship data
733            and configs and start the experiment.  There are small ordering
734            differences based on the initial state of the sub-experiment.
735            """
736            # ops node in the federant
737            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
738            user = tbparams[tb]['user']     # federant user
739            pid = tbparams[tb]['project']   # federant project
740            # XXX
741            base_confs = ( "hosts",)
742            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
743            # Configuration directories on the remote machine
744            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
745            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
746            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
747
748            state = self.get_state(user, host, tb, pid, eid)
749
750            self.log.debug("[start_segment]: %s: %s" % (tb, state))
751            self.log.info("[start_segment]:transferring experiment to %s" % tb)
752
753            if not self.scp_file("%s/%s/%s" % \
754                    (tmpdir, tb, tclfile), user, host):
755                return False
756           
757            if state == 'none':
758                # Create a null copy of the experiment so that we capture any
759                # logs there if the modify fails.  Emulab software discards the
760                # logs from a failed startexp
761                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
762                    return False
763                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
764                timedout = False
765                try:
766                    if not self.ssh_cmd(user, host,
767                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
768                            "-e %s null.tcl") % (pid, eid), "startexp",
769                            timeout=60 * 10):
770                        return False
771                except self.ssh_cmd_timeout:
772                    timedout = True
773
774                if timedout:
775                    state = self.get_state(user, host, tb, pid, eid)
776                    if state != "swapped":
777                        return False
778
779           
780            # Open up a temporary file to contain a script for setting up the
781            # filespace for the new experiment.
782            self.log.info("[start_segment]: creating script file")
783            try:
784                sf, scriptname = tempfile.mkstemp()
785                scriptfile = os.fdopen(sf, 'w')
786            except IOError:
787                return False
788
789            scriptbase = os.path.basename(scriptname)
790
791            # Script the filesystem changes
792            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
793            # Clear and create the tarfiles and rpm directories
794            for d in (tarfiles_dir, rpms_dir):
795                print >>scriptfile, "/bin/rm -rf %s/*" % d
796                print >>scriptfile, "mkdir -p %s" % d
797            print >>scriptfile, 'mkdir -p %s' % proj_dir
798            self.create_config_tree("%s/%s" % (tmpdir, tb),
799                    proj_dir, scriptfile)
800            if os.path.isdir("%s/tarfiles" % tmpdir):
801                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
802                        scriptfile)
803            if os.path.isdir("%s/rpms" % tmpdir):
804                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
805                        scriptfile)
806            print >>scriptfile, "rm -f %s" % scriptbase
807            scriptfile.close()
808
809            # Move the script to the remote machine
810            # XXX: could collide tempfile names on the remote host
811            if self.scp_file(scriptname, user, host, scriptbase):
812                os.remove(scriptname)
813            else:
814                return False
815
816            # Execute the script (and the script's last line deletes it)
817            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
818                return False
819
820            for f in base_confs:
821                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
822                        "%s/%s" % (proj_dir, f)):
823                    return False
824            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
825                    proj_dir):
826                return False
827            if os.path.isdir("%s/tarfiles" % tmpdir):
828                if not self.ship_configs(host, user,
829                        "%s/tarfiles" % tmpdir, tarfiles_dir):
830                    return False
831            if os.path.isdir("%s/rpms" % tmpdir):
832                if not self.ship_configs(host, user,
833                        "%s/rpms" % tmpdir, tarfiles_dir):
834                    return False
835            # Stage the new configuration (active experiments will stay swapped
836            # in now)
837            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
838            try:
839                if not self.ssh_cmd(user, host,
840                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
841                                (pid, eid, tclfile),
842                        "modexp", timeout= 60 * 10):
843                    return False
844            except self.ssh_cmd_timeout:
845                self.log.error("Modify command failed to complete in time")
846                # There's really no way to see if this succeeded or failed, so
847                # if it hangs, assume the worst.
848                return False
849            # Active experiments are still swapped, this swaps the others in.
850            if state != 'active':
851                self.log.info("[start_segment]: Swapping %s in on %s" % \
852                        (eid, tb))
853                timedout = False
854                try:
855                    if not self.ssh_cmd(user, host,
856                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
857                            "swapexp", timeout=10*60):
858                        return False
859                except self.ssh_cmd_timeout:
860                    timedout = True
861               
862                # If the command was terminated, but completed successfully,
863                # report success.
864                if timedout:
865                    self.log.debug("[start_segment]: swapin timed out " +\
866                            "checking state")
867                    state = self.get_state(user, host, tb, pid, eid)
868                    self.log.debug("[start_segment]: state is %s" % state)
869                    return state == 'active'
870            # Everything has gone OK.
871            return True
872
873    class stop_segment(emulab_segment):
874        def __init__(self, log=None, keyfile=None, debug=False):
875            experiment_control_local.emulab_segment.__init__(self,
876                    log=log, keyfile=keyfile, debug=debug)
877
878        def __call__(self, tb, eid, tbparams):
879            """
880            Stop a sub experiment by calling swapexp on the federant
881            """
882            user = tbparams[tb]['user']
883            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
884            pid = tbparams[tb]['project']
885
886            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
887            rv = False
888            try:
889                # Clean out tar files: we've gone over quota in the past
890                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
891                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
892                        (pid, eid))
893                rv = self.ssh_cmd(user, host,
894                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
895            except self.ssh_cmd_timeout:
896                rv = False
897            return rv
898
899       
900    def generate_ssh_keys(self, dest, type="rsa" ):
901        """
902        Generate a set of keys for the gateways to use to talk.
903
904        Keys are of type type and are stored in the required dest file.
905        """
906        valid_types = ("rsa", "dsa")
907        t = type.lower();
908        if t not in valid_types: raise ValueError
909        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
910
911        try:
912            trace = open("/dev/null", "w")
913        except IOError:
914            raise service_error(service_error.internal,
915                    "Cannot open /dev/null??");
916
917        # May raise CalledProcessError
918        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
919        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
920        if rv != 0:
921            raise service_error(service_error.internal, 
922                    "Cannot generate nonce ssh keys.  %s return code %d" \
923                            % (self.ssh_keygen, rv))
924
925    def gentopo(self, str):
926        """
927        Generate the topology dtat structure from the splitter's XML
928        representation of it.
929
930        The topology XML looks like:
931            <experiment>
932                <nodes>
933                    <node><vname></vname><ips>ip1:ip2</ips></node>
934                </nodes>
935                <lans>
936                    <lan>
937                        <vname></vname><vnode></vnode><ip></ip>
938                        <bandwidth></bandwidth><member>node:port</member>
939                    </lan>
940                </lans>
941        """
942        class topo_parse:
943            """
944            Parse the topology XML and create the dats structure.
945            """
946            def __init__(self):
947                # Typing of the subelements for data conversion
948                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
949                self.int_subelements = ( 'bandwidth',)
950                self.float_subelements = ( 'delay',)
951                # The final data structure
952                self.nodes = [ ]
953                self.lans =  [ ]
954                self.topo = { \
955                        'node': self.nodes,\
956                        'lan' : self.lans,\
957                    }
958                self.element = { }  # Current element being created
959                self.chars = ""     # Last text seen
960
961            def end_element(self, name):
962                # After each sub element the contents is added to the current
963                # element or to the appropriate list.
964                if name == 'node':
965                    self.nodes.append(self.element)
966                    self.element = { }
967                elif name == 'lan':
968                    self.lans.append(self.element)
969                    self.element = { }
970                elif name in self.str_subelements:
971                    self.element[name] = self.chars
972                    self.chars = ""
973                elif name in self.int_subelements:
974                    self.element[name] = int(self.chars)
975                    self.chars = ""
976                elif name in self.float_subelements:
977                    self.element[name] = float(self.chars)
978                    self.chars = ""
979
980            def found_chars(self, data):
981                self.chars += data.rstrip()
982
983
984        tp = topo_parse();
985        parser = xml.parsers.expat.ParserCreate()
986        parser.EndElementHandler = tp.end_element
987        parser.CharacterDataHandler = tp.found_chars
988
989        parser.Parse(str)
990
991        return tp.topo
992       
993
994    def genviz(self, topo):
995        """
996        Generate the visualization the virtual topology
997        """
998
999        neato = "/usr/local/bin/neato"
1000        # These are used to parse neato output and to create the visualization
1001        # file.
1002        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
1003        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
1004                "%s</type></node>"
1005
1006        try:
1007            # Node names
1008            nodes = [ n['vname'] for n in topo['node'] ]
1009            topo_lans = topo['lan']
1010        except KeyError, e:
1011            raise service_error(service_error.internal, "Bad topology: %s" %e)
1012
1013        lans = { }
1014        links = { }
1015
1016        # Walk through the virtual topology, organizing the connections into
1017        # 2-node connections (links) and more-than-2-node connections (lans).
1018        # When a lan is created, it's added to the list of nodes (there's a
1019        # node in the visualization for the lan).
1020        for l in topo_lans:
1021            if links.has_key(l['vname']):
1022                if len(links[l['vname']]) < 2:
1023                    links[l['vname']].append(l['vnode'])
1024                else:
1025                    nodes.append(l['vname'])
1026                    lans[l['vname']] = links[l['vname']]
1027                    del links[l['vname']]
1028                    lans[l['vname']].append(l['vnode'])
1029            elif lans.has_key(l['vname']):
1030                lans[l['vname']].append(l['vnode'])
1031            else:
1032                links[l['vname']] = [ l['vnode'] ]
1033
1034
1035        # Open up a temporary file for dot to turn into a visualization
1036        try:
1037            df, dotname = tempfile.mkstemp()
1038            dotfile = os.fdopen(df, 'w')
1039        except IOError:
1040            raise service_error(service_error.internal,
1041                    "Failed to open file in genviz")
1042
1043        try:
1044            dnull = open('/dev/null', 'w')
1045        except IOError:
1046            service_error(service_error.internal,
1047                    "Failed to open /dev/null in genviz")
1048
1049        # Generate a dot/neato input file from the links, nodes and lans
1050        try:
1051            print >>dotfile, "graph G {"
1052            for n in nodes:
1053                print >>dotfile, '\t"%s"' % n
1054            for l in links.keys():
1055                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
1056            for l in lans.keys():
1057                for n in lans[l]:
1058                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
1059            print >>dotfile, "}"
1060            dotfile.close()
1061        except TypeError:
1062            raise service_error(service_error.internal,
1063                    "Single endpoint link in vtopo")
1064        except IOError:
1065            raise service_error(service_error.internal, "Cannot write dot file")
1066
1067        # Use dot to create a visualization
1068        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1069                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
1070                close_fds=True)
1071        dnull.close()
1072
1073        # Translate dot to vis format
1074        vis_nodes = [ ]
1075        vis = { 'node': vis_nodes }
1076        for line in dot.stdout:
1077            m = vis_re.match(line)
1078            if m:
1079                vn = m.group(1)
1080                vis_node = {'name': vn, \
1081                        'x': float(m.group(2)),\
1082                        'y' : float(m.group(3)),\
1083                    }
1084                if vn in links.keys() or vn in lans.keys():
1085                    vis_node['type'] = 'lan'
1086                else:
1087                    vis_node['type'] = 'node'
1088                vis_nodes.append(vis_node)
1089        rv = dot.wait()
1090
1091        os.remove(dotname)
1092        if rv == 0 : return vis
1093        else: return None
1094
1095    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1096            access_user):
1097        """
1098        Get access to testbed through fedd and set the parameters for that tb
1099        """
1100        uri = self.tbmap.get(tb, None)
1101        if not uri:
1102            raise service_error(serice_error.server_config, 
1103                    "Unknown testbed: %s" % tb)
1104
1105        # currently this lumps all users into one service access group
1106        service_keys = [ a for u in user \
1107                for a in u.get('access', []) \
1108                    if a.has_key('sshPubkey')]
1109
1110        if len(service_keys) == 0:
1111            raise service_error(service_error.req, 
1112                    "Must have at least one SSH pubkey for services")
1113
1114
1115        for p, u in access_user:
1116            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1117                    "to %s") %  ((p or "None"), u, uri))
1118
1119            if p:
1120                # Request with user and project specified
1121                req = {\
1122                        'destinationTestbed' : { 'uri' : uri },
1123                        'project': { 
1124                            'name': {'localname': p},
1125                            'user': [ {'userID': { 'localname': u } } ],
1126                            },
1127                        'user':  user,
1128                        'allocID' : { 'localname': 'test' },
1129                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1130                        'serviceAccess' : service_keys
1131                    }
1132            else:
1133                # Request with only user specified
1134                req = {\
1135                        'destinationTestbed' : { 'uri' : uri },
1136                        'user':  [ {'userID': { 'localname': u } } ],
1137                        'allocID' : { 'localname': 'test' },
1138                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1139                        'serviceAccess' : service_keys
1140                    }
1141
1142            if tb == master:
1143                # NB, the export_project parameter is a dict that includes
1144                # the type
1145                req['exportProject'] = export_project
1146
1147            # node resources if any
1148            if nodes != None and len(nodes) > 0:
1149                rnodes = [ ]
1150                for n in nodes:
1151                    rn = { }
1152                    image, hw, count = n.split(":")
1153                    if image: rn['image'] = [ image ]
1154                    if hw: rn['hardware'] = [ hw ]
1155                    if count and int(count) >0 : rn['count'] = int(count)
1156                    rnodes.append(rn)
1157                req['resources']= { }
1158                req['resources']['node'] = rnodes
1159
1160            try:
1161                if self.local_access.has_key(uri):
1162                    # Local access call
1163                    req = { 'RequestAccessRequestBody' : req }
1164                    r = self.local_access[uri].RequestAccess(req, 
1165                            fedid(file=self.cert_file))
1166                    r = { 'RequestAccessResponseBody' : r }
1167                else:
1168                    r = self.call_RequestAccess(uri, req, 
1169                            self.cert_file, self.cert_pwd, self.trusted_certs)
1170            except service_error, e:
1171                if e.code == service_error.access:
1172                    self.log.debug("[get_access] Access denied")
1173                    r = None
1174                    continue
1175                else:
1176                    raise e
1177
1178            if r.has_key('RequestAccessResponseBody'):
1179                # Through to here we have a valid response, not a fault.
1180                # Access denied is a fault, so something better or worse than
1181                # access denied has happened.
1182                r = r['RequestAccessResponseBody']
1183                self.log.debug("[get_access] Access granted")
1184                break
1185            else:
1186                raise service_error(service_error.protocol,
1187                        "Bad proxy response")
1188       
1189        if not r:
1190            raise service_error(service_error.access, 
1191                    "Access denied by %s (%s)" % (tb, uri))
1192
1193        e = r['emulab']
1194        p = e['project']
1195        tbparam[tb] = { 
1196                "boss": e['boss'],
1197                "host": e['ops'],
1198                "domain": e['domain'],
1199                "fs": e['fileServer'],
1200                "eventserver": e['eventServer'],
1201                "project": unpack_id(p['name']),
1202                "emulab" : e,
1203                "allocID" : r['allocID'],
1204                }
1205        # Make the testbed name be the label the user applied
1206        p['testbed'] = {'localname': tb }
1207
1208        for u in p['user']:
1209            role = u.get('role', None)
1210            if role == 'experimentCreation':
1211                tbparam[tb]['user'] = unpack_id(u['userID'])
1212                break
1213        else:
1214            raise service_error(service_error.internal, 
1215                    "No createExperimentUser from %s" %tb)
1216
1217        # Add attributes to barameter space.  We don't allow attributes to
1218        # overlay any parameters already installed.
1219        for a in e['fedAttr']:
1220            try:
1221                if a['attribute'] and isinstance(a['attribute'], basestring)\
1222                        and not tbparam[tb].has_key(a['attribute'].lower()):
1223                    tbparam[tb][a['attribute'].lower()] = a['value']
1224            except KeyError:
1225                self.log.error("Bad attribute in response: %s" % a)
1226       
1227    def release_access(self, tb, aid):
1228        """
1229        Release access to testbed through fedd
1230        """
1231
1232        uri = self.tbmap.get(tb, None)
1233        if not uri:
1234            raise service_error(serice_error.server_config, 
1235                    "Unknown testbed: %s" % tb)
1236
1237        if self.local_access.has_key(uri):
1238            resp = self.local_access[uri].ReleaseAccess(\
1239                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1240                    fedid(file=self.cert_file))
1241            resp = { 'ReleaseAccessResponseBody': resp } 
1242        else:
1243            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1244                    self.cert_file, self.cert_pwd, self.trusted_certs)
1245
1246        # better error coding
1247
1248    def remote_splitter(self, uri, desc, master):
1249
1250        req = {
1251                'description' : { 'ns2description': desc },
1252                'master': master,
1253                'include_fedkit': bool(self.fedkit),
1254                'include_gatewaykit': bool(self.gatewaykit)
1255            }
1256
1257        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1258                self.trusted_certs)
1259
1260        if r.has_key('Ns2SplitResponseBody'):
1261            r = r['Ns2SplitResponseBody']
1262            if r.has_key('output'):
1263                return r['output'].splitlines()
1264            else:
1265                raise service_error(service_error.protocol, 
1266                        "Bad splitter response (no output)")
1267        else:
1268            raise service_error(service_error.protocol, "Bad splitter response")
1269       
1270    class current_testbed:
1271        """
1272        Object for collecting the current testbed description.  The testbed
1273        description is saved to a file with the local testbed variables
1274        subsittuted line by line.
1275        """
1276        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1277            def tar_list_to_string(tl):
1278                if tl is None: return None
1279
1280                rv = ""
1281                for t in tl:
1282                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1283                            (t[0], os.path.basename(t[1]))
1284                return rv
1285
1286
1287            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1288            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1289            self.current_testbed = None
1290            self.testbed_file = None
1291
1292            self.def_expstart = \
1293                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1294            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1295            self.def_gwstart = \
1296                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1297            self.def_mgwstart = \
1298                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1299            self.def_gwimage = "FBSD61-TUNNEL2";
1300            self.def_gwtype = "pc";
1301            self.def_mgwcmd = '# '
1302            self.def_mgwcmdparams = ''
1303            self.def_gwcmd = '# '
1304            self.def_gwcmdparams = ''
1305
1306            self.eid = eid
1307            self.tmpdir = tmpdir
1308            # Convert fedkit and gateway kit (which are lists of tuples) into a
1309            # substituition string.
1310            self.fedkit = tar_list_to_string(fedkit)
1311            self.gatewaykit = tar_list_to_string(gatewaykit)
1312
1313        def __call__(self, line, master, allocated, tbparams):
1314            # Capture testbed topology descriptions
1315            if self.current_testbed == None:
1316                m = self.begin_testbed.match(line)
1317                if m != None:
1318                    self.current_testbed = m.group(1)
1319                    if self.current_testbed == None:
1320                        raise service_error(service_error.req,
1321                                "Bad request format (unnamed testbed)")
1322                    allocated[self.current_testbed] = \
1323                            allocated.get(self.current_testbed,0) + 1
1324                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1325                    if not os.path.exists(tb_dir):
1326                        try:
1327                            os.mkdir(tb_dir)
1328                        except IOError:
1329                            raise service_error(service_error.internal,
1330                                    "Cannot create %s" % tb_dir)
1331                    try:
1332                        self.testbed_file = open("%s/%s.%s.tcl" %
1333                                (tb_dir, self.eid, self.current_testbed), 'w')
1334                    except IOError:
1335                        self.testbed_file = None
1336                    return True
1337                else: return False
1338            else:
1339                m = self.end_testbed.match(line)
1340                if m != None:
1341                    if m.group(1) != self.current_testbed:
1342                        raise service_error(service_error.internal, 
1343                                "Mismatched testbed markers!?")
1344                    if self.testbed_file != None: 
1345                        self.testbed_file.close()
1346                        self.testbed_file = None
1347                    self.current_testbed = None
1348                elif self.testbed_file:
1349                    # Substitute variables and put the line into the local
1350                    # testbed file.
1351                    gwtype = tbparams[self.current_testbed].get(\
1352                            'connectortype', self.def_gwtype)
1353                    gwimage = tbparams[self.current_testbed].get(\
1354                            'connectorimage', self.def_gwimage)
1355                    mgwstart = tbparams[self.current_testbed].get(\
1356                            'masterconnectorstartcmd', self.def_mgwstart)
1357                    mexpstart = tbparams[self.current_testbed].get(\
1358                            'masternodestartcmd', self.def_mexpstart)
1359                    gwstart = tbparams[self.current_testbed].get(\
1360                            'slaveconnectorstartcmd', self.def_gwstart)
1361                    expstart = tbparams[self.current_testbed].get(\
1362                            'slavenodestartcmd', self.def_expstart)
1363                    project = tbparams[self.current_testbed].get('project')
1364                    gwcmd = tbparams[self.current_testbed].get(\
1365                            'slaveconnectorcmd', self.def_gwcmd)
1366                    gwcmdparams = tbparams[self.current_testbed].get(\
1367                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1368                    mgwcmd = tbparams[self.current_testbed].get(\
1369                            'masterconnectorcmd', self.def_gwcmd)
1370                    mgwcmdparams = tbparams[self.current_testbed].get(\
1371                            'masterconnectorcmdparams', self.def_gwcmdparams)
1372                    line = re.sub("GWTYPE", gwtype, line)
1373                    line = re.sub("GWIMAGE", gwimage, line)
1374                    if self.current_testbed == master:
1375                        line = re.sub("GWSTART", mgwstart, line)
1376                        line = re.sub("EXPSTART", mexpstart, line)
1377                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1378                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1379                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1380                    else:
1381                        line = re.sub("GWSTART", gwstart, line)
1382                        line = re.sub("EXPSTART", expstart, line)
1383                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1384                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1385                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1386                    #These expansions contain EID and PROJDIR.  NB these are
1387                    # local fedkit and gatewaykit, which are strings.
1388                    if self.fedkit:
1389                        line = re.sub("FEDKIT", self.fedkit, line)
1390                    if self.gatewaykit:
1391                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1392                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1393                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1394                    line = re.sub("EID", self.eid, line)
1395                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1396                            (project, self.eid), line)
1397                    print >>self.testbed_file, line
1398                return True
1399
1400    class allbeds:
1401        """
1402        Process the Allbeds section.  Get access to each federant and save the
1403        parameters in tbparams
1404        """
1405        def __init__(self, get_access):
1406            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1407            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1408            self.in_allbeds = False
1409            self.get_access = get_access
1410
1411        def __call__(self, line, user, tbparams, master, export_project,
1412                access_user):
1413            # Testbed access parameters
1414            if not self.in_allbeds:
1415                if self.begin_allbeds.match(line):
1416                    self.in_allbeds = True
1417                    return True
1418                else:
1419                    return False
1420            else:
1421                if self.end_allbeds.match(line):
1422                    self.in_allbeds = False
1423                else:
1424                    nodes = line.split('|')
1425                    tb = nodes.pop(0)
1426                    self.get_access(tb, nodes, user, tbparams, master,
1427                            export_project, access_user)
1428                return True
1429
1430    class gateways:
1431        def __init__(self, eid, master, tmpdir, gw_pubkey,
1432                gw_secretkey, copy_file, fedkit):
1433            self.begin_gateways = \
1434                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1435            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1436            self.current_gateways = None
1437            self.control_gateway = None
1438            self.active_end = { }
1439
1440            self.eid = eid
1441            self.master = master
1442            self.tmpdir = tmpdir
1443            self.gw_pubkey_base = gw_pubkey
1444            self.gw_secretkey_base = gw_secretkey
1445
1446            self.copy_file = copy_file
1447            self.fedkit = fedkit
1448
1449
1450        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1451                active_end, tbparams, dtb, myname, desthost, type):
1452            """
1453            Produce a gateway configuration file from a gateways line.
1454            """
1455
1456            sproject = tbparams[gw].get('project', 'project')
1457            dproject = tbparams[dtb].get('project', 'project')
1458            sdomain = ".%s.%s%s" % (eid, sproject,
1459                    tbparams[gw].get('domain', ".example.com"))
1460            ddomain = ".%s.%s%s" % (eid, dproject,
1461                    tbparams[dtb].get('domain', ".example.com"))
1462            boss = tbparams[master].get('boss', "boss")
1463            fs = tbparams[master].get('fs', "fs")
1464            event_server = "%s%s" % \
1465                    (tbparams[gw].get('eventserver', "event_server"),
1466                            tbparams[gw].get('domain', "example.com"))
1467            remote_event_server = "%s%s" % \
1468                    (tbparams[dtb].get('eventserver', "event_server"),
1469                            tbparams[dtb].get('domain', "example.com"))
1470            seer_control = "%s%s" % \
1471                    (tbparams[gw].get('control', "control"), sdomain)
1472            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1473
1474            if self.fedkit:
1475                remote_script_dir = "/usr/local/federation/bin"
1476                local_script_dir = "/usr/local/federation/bin"
1477            else:
1478                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1479                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1480
1481            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1482            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1483            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1484
1485            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1486            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1487
1488            # translate to lower case so the `hostname` hack for specifying
1489            # configuration files works.
1490            conf_file = conf_file.lower();
1491            remote_conf_file = remote_conf_file.lower();
1492
1493            if dtb == master:
1494                active = "false"
1495            elif gw == master:
1496                active = "true"
1497            elif active_end.has_key('%s-%s' % (dtb, gw)):
1498                active = "false"
1499            else:
1500                active_end['%s-%s' % (gw, dtb)] = 1
1501                active = "true"
1502
1503            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1504            print >>gwconfig, "Active: %s" % active
1505            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1506            if tunnel_iface:
1507                print >>gwconfig, "Interface: %s" % tunnel_iface
1508            print >>gwconfig, "BossName: %s" % boss
1509            print >>gwconfig, "FsName: %s" % fs
1510            print >>gwconfig, "EventServerName: %s" % event_server
1511            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1512            print >>gwconfig, "SeerControl: %s" % seer_control
1513            print >>gwconfig, "Type: %s" % type
1514            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1515            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1516                    local_script_dir
1517            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1518            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1519            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1520                    (remote_conf_dir, remote_conf_file)
1521            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1522            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1523            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1524            gwconfig.close()
1525
1526            return active == "true"
1527
1528        def __call__(self, line, allocated, tbparams):
1529            # Process gateways
1530            if not self.current_gateways:
1531                m = self.begin_gateways.match(line)
1532                if m:
1533                    self.current_gateways = m.group(1)
1534                    if allocated.has_key(self.current_gateways):
1535                        # This test should always succeed
1536                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1537                        if not os.path.exists(tb_dir):
1538                            try:
1539                                os.mkdir(tb_dir)
1540                            except IOError:
1541                                raise service_error(service_error.internal,
1542                                        "Cannot create %s" % tb_dir)
1543                    else:
1544                        # XXX
1545                        self.log.error("[gateways]: Ignoring gateways for " + \
1546                                "unknown testbed %s" % self.current_gateways)
1547                        self.current_gateways = None
1548                    return True
1549                else:
1550                    return False
1551            else:
1552                m = self.end_gateways.match(line)
1553                if m :
1554                    if m.group(1) != self.current_gateways:
1555                        raise service_error(service_error.internal,
1556                                "Mismatched gateway markers!?")
1557                    if self.control_gateway:
1558                        try:
1559                            cc = open("%s/%s/client.conf" %
1560                                    (self.tmpdir, self.current_gateways), 'w')
1561                            print >>cc, "ControlGateway: %s" % \
1562                                    self.control_gateway
1563                            if tbparams[self.master].has_key('smbshare'):
1564                                print >>cc, "SMBSHare: %s" % \
1565                                        tbparams[self.master]['smbshare']
1566                            print >>cc, "ProjectUser: %s" % \
1567                                    tbparams[self.master]['user']
1568                            print >>cc, "ProjectName: %s" % \
1569                                    tbparams[self.master]['project']
1570                            print >>cc, "ExperimentID: %s/%s" % \
1571                                    ( tbparams[self.master]['project'], \
1572                                    self.eid )
1573                            cc.close()
1574                        except IOError:
1575                            raise service_error(service_error.internal,
1576                                    "Error creating client config")
1577                        # XXX: This seer specific file should disappear
1578                        try:
1579                            cc = open("%s/%s/seer.conf" %
1580                                    (self.tmpdir, self.current_gateways),
1581                                    'w')
1582                            if self.current_gateways != self.master:
1583                                print >>cc, "ControlNode: %s" % \
1584                                        self.control_gateway
1585                            print >>cc, "ExperimentID: %s/%s" % \
1586                                    ( tbparams[self.master]['project'], \
1587                                    self.eid )
1588                            cc.close()
1589                        except IOError:
1590                            raise service_error(service_error.internal,
1591                                    "Error creating seer config")
1592                    else:
1593                        debug.error("[gateways]: No control gateway for %s" %\
1594                                    self.current_gateways)
1595                    self.current_gateways = None
1596                else:
1597                    dtb, myname, desthost, type = line.split(" ")
1598
1599                    if type == "control" or type == "both":
1600                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1601                                self.eid, 
1602                                tbparams[self.current_gateways]['project'],
1603                                tbparams[self.current_gateways]['domain'])
1604                    try:
1605                        active = self.gateway_conf_file(self.current_gateways,
1606                                self.master, self.eid, self.gw_pubkey_base,
1607                                self.gw_secretkey_base,
1608                                self.active_end, tbparams, dtb, myname,
1609                                desthost, type)
1610                    except IOError, e:
1611                        raise service_error(service_error.internal,
1612                                "Failed to write config file for %s" % \
1613                                        self.current_gateway)
1614           
1615                    gw_pubkey = "%s/keys/%s" % \
1616                            (self.tmpdir, self.gw_pubkey_base)
1617                    gw_secretkey = "%s/keys/%s" % \
1618                            (self.tmpdir, self.gw_secretkey_base)
1619
1620                    pkfile = "%s/%s/%s" % \
1621                            ( self.tmpdir, self.current_gateways, 
1622                                    self.gw_pubkey_base)
1623                    skfile = "%s/%s/%s" % \
1624                            ( self.tmpdir, self.current_gateways, 
1625                                    self.gw_secretkey_base)
1626
1627                    if not os.path.exists(pkfile):
1628                        try:
1629                            self.copy_file(gw_pubkey, pkfile)
1630                        except IOError:
1631                            service_error(service_error.internal,
1632                                    "Failed to copy pubkey file")
1633
1634                    if active and not os.path.exists(skfile):
1635                        try:
1636                            self.copy_file(gw_secretkey, skfile)
1637                        except IOError:
1638                            service_error(service_error.internal,
1639                                    "Failed to copy secretkey file")
1640                return True
1641
1642    class shunt_to_file:
1643        """
1644        Simple class to write data between two regexps to a file.
1645        """
1646        def __init__(self, begin, end, filename):
1647            """
1648            Begin shunting on a match of begin, stop on end, send data to
1649            filename.
1650            """
1651            self.begin = re.compile(begin)
1652            self.end = re.compile(end)
1653            self.in_shunt = False
1654            self.file = None
1655            self.filename = filename
1656
1657        def __call__(self, line):
1658            """
1659            Call this on each line in the input that may be shunted.
1660            """
1661            if not self.in_shunt:
1662                if self.begin.match(line):
1663                    self.in_shunt = True
1664                    try:
1665                        self.file = open(self.filename, "w")
1666                    except:
1667                        self.file = None
1668                        raise
1669                    return True
1670                else:
1671                    return False
1672            else:
1673                if self.end.match(line):
1674                    if self.file: 
1675                        self.file.close()
1676                        self.file = None
1677                    self.in_shunt = False
1678                else:
1679                    if self.file:
1680                        print >>self.file, line
1681                return True
1682
1683    class shunt_to_list:
1684        """
1685        Same interface as shunt_to_file.  Data collected in self.list, one list
1686        element per line.
1687        """
1688        def __init__(self, begin, end):
1689            self.begin = re.compile(begin)
1690            self.end = re.compile(end)
1691            self.in_shunt = False
1692            self.list = [ ]
1693       
1694        def __call__(self, line):
1695            if not self.in_shunt:
1696                if self.begin.match(line):
1697                    self.in_shunt = True
1698                    return True
1699                else:
1700                    return False
1701            else:
1702                if self.end.match(line):
1703                    self.in_shunt = False
1704                else:
1705                    self.list.append(line)
1706                return True
1707
1708    class shunt_to_string:
1709        """
1710        Same interface as shunt_to_file.  Data collected in self.str, all in
1711        one string.
1712        """
1713        def __init__(self, begin, end):
1714            self.begin = re.compile(begin)
1715            self.end = re.compile(end)
1716            self.in_shunt = False
1717            self.str = ""
1718       
1719        def __call__(self, line):
1720            if not self.in_shunt:
1721                if self.begin.match(line):
1722                    self.in_shunt = True
1723                    return True
1724                else:
1725                    return False
1726            else:
1727                if self.end.match(line):
1728                    self.in_shunt = False
1729                else:
1730                    self.str += line
1731                return True
1732
1733    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1734            tbparams, tmpdir, alloc_log=None):
1735        started = { }           # Testbeds where a sub-experiment started
1736                                # successfully
1737
1738        # XXX
1739        fail_soft = False
1740
1741        log = alloc_log or self.log
1742
1743        thread_pool = self.thread_pool(self.nthreads)
1744        threads = [ ]
1745
1746        for tb in [ k for k in allocated.keys() if k != master]:
1747            # Create and start a thread to start the segment, and save it to
1748            # get the return value later
1749            thread_pool.wait_for_slot()
1750            t  = self.pooled_thread(\
1751                    target=self.start_segment(log=log,
1752                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1753                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1754                    pdata=thread_pool, trace_file=self.trace_file)
1755            threads.append(t)
1756            t.start()
1757
1758        # Wait until all finish
1759        thread_pool.wait_for_all_done()
1760
1761        # If none failed, start the master
1762        failed = [ t.getName() for t in threads if not t.rv ]
1763
1764        if len(failed) == 0:
1765            starter = self.start_segment(log=log, 
1766                    keyfile=self.ssh_privkey_file, debug=self.debug)
1767            if not starter(master, eid, tbparams, tmpdir):
1768                failed.append(master)
1769
1770        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1771        # If one failed clean up, unless fail_soft is set
1772        if failed:
1773            if not fail_soft:
1774                thread_pool.clear()
1775                for tb in succeeded:
1776                    # Create and start a thread to stop the segment
1777                    thread_pool.wait_for_slot()
1778                    t  = self.pooled_thread(\
1779                            target=self.stop_segment(log=log,
1780                                keyfile=self.ssh_privkey_file,
1781                                debug=self.debug), 
1782                            args=(tb, eid, tbparams), name=tb,
1783                            pdata=thread_pool, trace_file=self.trace_file)
1784                    t.start()
1785                # Wait until all finish
1786                thread_pool.wait_for_all_done()
1787
1788                # release the allocations
1789                for tb in tbparams.keys():
1790                    self.release_access(tb, tbparams[tb]['allocID'])
1791                # Remove the placeholder
1792                self.state_lock.acquire()
1793                self.state[eid]['experimentStatus'] = 'failed'
1794                if self.state_filename: self.write_state()
1795                self.state_lock.release()
1796
1797                #raise service_error(service_error.federant,
1798                #    "Swap in failed on %s" % ",".join(failed))
1799                log.error("Swap in failed on %s" % ",".join(failed))
1800                return
1801        else:
1802            log.info("[start_segment]: Experiment %s active" % eid)
1803
1804        log.debug("[start_experiment]: removing %s" % tmpdir)
1805
1806        # Walk up tmpdir, deleting as we go
1807        for path, dirs, files in os.walk(tmpdir, topdown=False):
1808            for f in files:
1809                os.remove(os.path.join(path, f))
1810            for d in dirs:
1811                os.rmdir(os.path.join(path, d))
1812        os.rmdir(tmpdir)
1813
1814        # Insert the experiment into our state and update the disk copy
1815        self.state_lock.acquire()
1816        self.state[expid]['experimentStatus'] = 'active'
1817        self.state[eid] = self.state[expid]
1818        if self.state_filename: self.write_state()
1819        self.state_lock.release()
1820        return
1821
1822    def create_experiment(self, req, fid):
1823        """
1824        The external interface to experiment creation called from the
1825        dispatcher.
1826
1827        Creates a working directory, splits the incoming description using the
1828        splitter script and parses out the avrious subsections using the
1829        lcasses above.  Once each sub-experiment is created, use pooled threads
1830        to instantiate them and start it all up.
1831        """
1832
1833        if not self.auth.check_attribute(fid, 'create'):
1834            raise service_error(service_error.access, "Create access denied")
1835
1836        try:
1837            tmpdir = tempfile.mkdtemp(prefix="split-")
1838        except IOError:
1839            raise service_error(service_error.internal, "Cannot create tmp dir")
1840
1841        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1842        gw_secretkey_base = "fed.%s" % self.ssh_type
1843        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1844        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1845        tclfile = tmpdir + "/experiment.tcl"
1846        tbparams = { }
1847        try:
1848            access_user = self.accessdb[fid]
1849        except KeyError:
1850            raise service_error(service_error.internal,
1851                    "Access map and authorizer out of sync in " + \
1852                            "create_experiment for fedid %s"  % fid)
1853
1854        pid = "dummy"
1855        gid = "dummy"
1856        try:
1857            os.mkdir(tmpdir+"/keys")
1858        except OSError:
1859            raise service_error(service_error.internal,
1860                    "Can't make temporary dir")
1861
1862        req = req.get('CreateRequestBody', None)
1863        if not req:
1864            raise service_error(service_error.req,
1865                    "Bad request format (no CreateRequestBody)")
1866        # The tcl parser needs to read a file so put the content into that file
1867        descr=req.get('experimentdescription', None)
1868        if descr:
1869            file_content=descr.get('ns2description', None)
1870            if file_content:
1871                try:
1872                    f = open(tclfile, 'w')
1873                    f.write(file_content)
1874                    f.close()
1875                except IOError:
1876                    raise service_error(service_error.internal,
1877                            "Cannot write temp experiment description")
1878            else:
1879                raise service_error(service_error.req, 
1880                        "Only ns2descriptions supported")
1881        else:
1882            raise service_error(service_error.req, "No experiment description")
1883
1884        # Generate an ID for the experiment (slice) and a certificate that the
1885        # allocator can use to prove they own it.  We'll ship it back through
1886        # the encrypted connection.
1887        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1888
1889        if req.has_key('experimentID') and \
1890                req['experimentID'].has_key('localname'):
1891            overwrite = False
1892            eid = req['experimentID']['localname']
1893            # If there's an old failed experiment here with the same local name
1894            # and accessible by this user, we'll overwrite it, otherwise we'll
1895            # fall through and do the collision avoidance.
1896            old_expid = self.get_experiment_fedid(eid)
1897            if old_expid and self.check_experiment_access(fid, old_expid):
1898                self.state_lock.acquire()
1899                status = self.state[eid].get('experimentStatus', None)
1900                if status and status == 'failed':
1901                    # remove the old access attribute
1902                    self.auth.unset_attribute(fid, old_expid)
1903                    overwrite = True
1904                    del self.state[eid]
1905                    del self.state[old_expid]
1906                self.state_lock.release()
1907            self.state_lock.acquire()
1908            while (self.state.has_key(eid) and not overwrite):
1909                eid += random.choice(string.ascii_letters)
1910            # Initial state
1911            self.state[eid] = {
1912                    'experimentID' : \
1913                            [ { 'localname' : eid }, {'fedid': expid } ],
1914                    'experimentStatus': 'starting',
1915                    'experimentAccess': { 'X509' : expcert },
1916                    'owner': fid,
1917                    'log' : [],
1918                }
1919            self.state[expid] = self.state[eid]
1920            if self.state_filename: self.write_state()
1921            self.state_lock.release()
1922        else:
1923            eid = self.exp_stem
1924            for i in range(0,5):
1925                eid += random.choice(string.ascii_letters)
1926            self.state_lock.acquire()
1927            while (self.state.has_key(eid)):
1928                eid = self.exp_stem
1929                for i in range(0,5):
1930                    eid += random.choice(string.ascii_letters)
1931            # Initial state
1932            self.state[eid] = {
1933                    'experimentID' : \
1934                            [ { 'localname' : eid }, {'fedid': expid } ],
1935                    'experimentStatus': 'starting',
1936                    'experimentAccess': { 'X509' : expcert },
1937                    'owner': fid,
1938                    'log' : [],
1939                }
1940            self.state[expid] = self.state[eid]
1941            if self.state_filename: self.write_state()
1942            self.state_lock.release()
1943
1944        try: 
1945            # This catches exceptions to clear the placeholder if necessary
1946            try:
1947                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1948            except ValueError:
1949                raise service_error(service_error.server_config, 
1950                        "Bad key type (%s)" % self.ssh_type)
1951
1952            user = req.get('user', None)
1953            if user == None:
1954                raise service_error(service_error.req, "No user")
1955
1956            master = req.get('master', None)
1957            if not master:
1958                raise service_error(service_error.req,
1959                        "No master testbed label")
1960            export_project = req.get('exportProject', None)
1961            if not export_project:
1962                raise service_error(service_error.req, "No export project")
1963           
1964            if self.splitter_url:
1965                self.log.debug("Calling remote splitter at %s" % \
1966                        self.splitter_url)
1967                split_data = self.remote_splitter(self.splitter_url,
1968                        file_content, master)
1969            else:
1970                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1971                    str(self.muxmax), '-m', master]
1972
1973                if self.fedkit:
1974                    tclcmd.append('-k')
1975
1976                if self.gatewaykit:
1977                    tclcmd.append('-K')
1978
1979                tclcmd.extend([pid, gid, eid, tclfile])
1980
1981                self.log.debug("running local splitter %s", " ".join(tclcmd))
1982                # This is just fantastic.  As a side effect the parser copies
1983                # tb_compat.tcl into the current directory, so that directory
1984                # must be writable by the fedd user.  Doing this in the
1985                # temporary subdir ensures this is the case.
1986                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1987                        cwd=tmpdir)
1988                split_data = tclparser.stdout
1989
1990            allocated = { }         # Testbeds we can access
1991            # Objects to parse the splitter output (defined above)
1992            parse_current_testbed = self.current_testbed(eid, tmpdir,
1993                    self.fedkit, self.gatewaykit)
1994            parse_allbeds = self.allbeds(self.get_access)
1995            parse_gateways = self.gateways(eid, master, tmpdir,
1996                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1997                    self.fedkit)
1998            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1999                        "^#\s+End\s+Vtopo")
2000            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
2001                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
2002            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
2003                    "^#\s+End\s+tarfiles")
2004            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
2005                    "^#\s+End\s+rpms")
2006
2007            # Working on the split data
2008            for line in split_data:
2009                line = line.rstrip()
2010                if parse_current_testbed(line, master, allocated, tbparams):
2011                    continue
2012                elif parse_allbeds(line, user, tbparams, master, export_project,
2013                        access_user):
2014                    continue
2015                elif parse_gateways(line, allocated, tbparams):
2016                    continue
2017                elif parse_vtopo(line):
2018                    continue
2019                elif parse_hostnames(line):
2020                    continue
2021                elif parse_tarfiles(line):
2022                    continue
2023                elif parse_rpms(line):
2024                    continue
2025                else:
2026                    raise service_error(service_error.internal, 
2027                            "Bad tcl parse? %s" % line)
2028            # Virtual topology and visualization
2029            vtopo = self.gentopo(parse_vtopo.str)
2030            if not vtopo:
2031                raise service_error(service_error.internal, 
2032                        "Failed to generate virtual topology")
2033
2034            vis = self.genviz(vtopo)
2035            if not vis:
2036                raise service_error(service_error.internal, 
2037                        "Failed to generate visualization")
2038
2039           
2040            # save federant information
2041            for k in allocated.keys():
2042                tbparams[k]['federant'] = {\
2043                        'name': [ { 'localname' : eid} ],\
2044                        'emulab': tbparams[k]['emulab'],\
2045                        'allocID' : tbparams[k]['allocID'],\
2046                        'master' : k == master,\
2047                    }
2048
2049            self.state_lock.acquire()
2050            self.state[eid]['vtopo'] = vtopo
2051            self.state[eid]['vis'] = vis
2052            self.state[expid]['federant'] = \
2053                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2054                        if tbparams[tb].has_key('federant') ]
2055            if self.state_filename: self.write_state()
2056            self.state_lock.release()
2057
2058            # Copy tarfiles and rpms needed at remote sites into a staging area
2059            try:
2060                if self.fedkit:
2061                    for t in self.fedkit:
2062                        parse_tarfiles.list.append(t[1])
2063                if self.gatewaykit:
2064                    for t in self.gatewaykit:
2065                        parse_tarfiles.list.append(t[1])
2066                for t in parse_tarfiles.list:
2067                    if not os.path.exists("%s/tarfiles" % tmpdir):
2068                        os.mkdir("%s/tarfiles" % tmpdir)
2069                    self.copy_file(t, "%s/tarfiles/%s" % \
2070                            (tmpdir, os.path.basename(t)))
2071                for r in parse_rpms.list:
2072                    if not os.path.exists("%s/rpms" % tmpdir):
2073                        os.mkdir("%s/rpms" % tmpdir)
2074                    self.copy_file(r, "%s/rpms/%s" % \
2075                            (tmpdir, os.path.basename(r)))
2076                # A null experiment file in case we need to create a remote
2077                # experiment from scratch
2078                f = open("%s/null.tcl" % tmpdir, "w")
2079                print >>f, """
2080set ns [new Simulator]
2081source tb_compat.tcl
2082
2083set a [$ns node]
2084
2085$ns rtproto Session
2086$ns run
2087"""
2088                f.close()
2089
2090            except IOError, e:
2091                raise service_error(service_error.internal, 
2092                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2093
2094        except service_error, e:
2095            # If something goes wrong in the parse (usually an access error)
2096            # clear the placeholder state.  From here on out the code delays
2097            # exceptions.  Failing at this point returns a fault to the remote
2098            # caller.
2099            self.state_lock.acquire()
2100            del self.state[eid]
2101            del self.state[expid]
2102            if self.state_filename: self.write_state()
2103            self.state_lock.release()
2104            raise e
2105
2106
2107        # Start the background swapper and return the starting state.  From
2108        # here on out, the state will stick around a while.
2109
2110        # Let users touch the state
2111        self.auth.set_attribute(fid, expid)
2112        self.auth.set_attribute(expid, expid)
2113        # Override fedids can manipulate state as well
2114        for o in self.overrides:
2115            self.auth.set_attribute(o, expid)
2116
2117        # Create a logger that logs to the experiment's state object as well as
2118        # to the main log file.
2119        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2120        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2121        # XXX: there should be a global one of these rather than repeating the
2122        # code.
2123        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2124                    '%d %b %y %H:%M:%S'))
2125        alloc_log.addHandler(h)
2126       
2127        # Start a thread to do the resource allocation
2128        t  = Thread(target=self.allocate_resources,
2129                args=(allocated, master, eid, expid, expcert, tbparams, 
2130                    tmpdir, alloc_log),
2131                name=eid)
2132        t.start()
2133
2134        rv = {
2135                'experimentID': [
2136                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2137                ],
2138                'experimentStatus': 'starting',
2139                'experimentAccess': { 'X509' : expcert }
2140            }
2141
2142        return rv
2143
2144    class new_start_segment:
2145        def __init__(self, debug=False, log=None, cert_file=None,
2146                cert_pwd=None, trusted_certs=None, caller=None):
2147            self.log = log
2148            self.debug = debug
2149            self.cert_file = cert_file
2150            self.cert_pwd = cert_pwd
2151            self.trusted_certs = None
2152            self.caller = caller
2153
2154        def __call__(self, uri, aid, topo, master, attrs=None):
2155            req = {
2156                    'allocID': { 'fedid' : aid }, 
2157                    'segmentdescription': { 
2158                        'topdldescription': topo.to_dict(),
2159                    },
2160                    'master': master,
2161                }
2162            if attrs:
2163                req['fedAttr'] = attrs
2164
2165            print req
2166            r = self.caller(uri, req, self.cert_file, self.cert_pwd,
2167                    self.trusted_certs)
2168            print r
2169            return True
2170
2171
2172
2173    class new_terminate_segment:
2174        def __init__(self, debug=False, log=None, cert_file=None,
2175                cert_pwd=None, trusted_certs=None, caller=None):
2176            self.log = log
2177            self.debug = debug
2178            self.cert_file = cert_file
2179            self.cert_pwd = cert_pwd
2180            self.trusted_certs = None
2181            self.caller = caller
2182
2183        def __call__(self, uri, aid ):
2184            print "in terminate_segment: %s" % aid
2185            req = {
2186                    'allocID': aid , 
2187                }
2188            r = self.caller(uri, req, self.cert_file, self.cert_pwd,
2189                    self.trusted_certs)
2190            return True
2191
2192
2193   
2194
2195    def new_allocate_resources(self, allocated, master, eid, expid, expcert, 
2196            tbparams, topo, tmpdir, alloc_log=None, attrs=None):
2197        started = { }           # Testbeds where a sub-experiment started
2198                                # successfully
2199
2200        # XXX
2201        fail_soft = False
2202
2203        log = alloc_log or self.log
2204
2205        thread_pool = self.thread_pool(self.nthreads)
2206        threads = [ ]
2207
2208        for tb in [ k for k in allocated.keys() if k != master]:
2209            # Create and start a thread to start the segment, and save it to
2210            # get the return value later
2211            thread_pool.wait_for_slot()
2212            uri = self.tbmap.get(tb, None)
2213            if not uri:
2214                raise service_error(service_error.internal, 
2215                        "Unknown testbed %s !?" % tb)
2216
2217            if tbparams[tb].has_key('allocID') and \
2218                    tbparams[tb]['allocID'].has_key('fedid'):
2219                aid = tbparams[tb]['allocID']['fedid']
2220            else:
2221                raise service_error(service_error.internal, 
2222                        "No alloc id for testbed %s !?" % tb)
2223
2224            t  = self.pooled_thread(\
2225                    target=self.new_start_segment(log=log, debug=self.debug,
2226                        cert_file=self.cert_file, cert_pwd=self.cert_pwd,
2227                        trusted_certs=self.trusted_certs,
2228                        caller=self.call_StartSegment), 
2229                    args=(uri, aid, topo[tb], False, attrs), name=tb,
2230                    pdata=thread_pool, trace_file=self.trace_file)
2231            threads.append(t)
2232            t.start()
2233
2234        # Wait until all finish
2235        thread_pool.wait_for_all_done()
2236
2237        # If none failed, start the master
2238        failed = [ t.getName() for t in threads if not t.rv ]
2239
2240        if len(failed) == 0:
2241            uri = self.tbmap.get(master, None)
2242            if not uri:
2243                raise service_error(service_error.internal, 
2244                        "Unknown testbed %s !?" % master)
2245
2246            if tbparams[master].has_key('allocID') and \
2247                    tbparams[master]['allocID'].has_key('fedid'):
2248                aid = tbparams[master]['allocID']['fedid']
2249            else:
2250                raise service_error(service_error.internal, 
2251                    "No alloc id for testbed %s !?" % master)
2252            starter = self.new_start_segment(log=log, debug=self.debug,
2253                    cert_file=self.cert_file, cert_pwd=self.cert_pwd,
2254                    trusted_certs=self.trusted_certs,
2255                    caller=self.call_StartSegment)
2256            if not starter(uri, aid, topo[master], True, attrs):
2257                failed.append(master)
2258
2259        succeeded = [tb for tb in allocated.keys() if tb not in failed]
2260        # If one failed clean up, unless fail_soft is set
2261        if failed and False:
2262            if not fail_soft:
2263                thread_pool.clear()
2264                for tb in succeeded:
2265                    # Create and start a thread to stop the segment
2266                    thread_pool.wait_for_slot()
2267                    t  = self.pooled_thread(\
2268                            target=self.stop_segment(log=log,
2269                                keyfile=self.ssh_privkey_file,
2270                                debug=self.debug), 
2271                            args=(tb, eid, tbparams), name=tb,
2272                            pdata=thread_pool, trace_file=self.trace_file)
2273                    t.start()
2274                # Wait until all finish
2275                thread_pool.wait_for_all_done()
2276
2277                # release the allocations
2278                for tb in tbparams.keys():
2279                    self.release_access(tb, tbparams[tb]['allocID'])
2280                # Remove the placeholder
2281                self.state_lock.acquire()
2282                self.state[eid]['experimentStatus'] = 'failed'
2283                if self.state_filename: self.write_state()
2284                self.state_lock.release()
2285
2286                log.error("Swap in failed on %s" % ",".join(failed))
2287                return
2288        else:
2289            log.info("[start_segment]: Experiment %s active" % eid)
2290
2291        log.debug("[start_experiment]: removing %s" % tmpdir)
2292
2293        # Walk up tmpdir, deleting as we go
2294        for path, dirs, files in os.walk(tmpdir, topdown=False):
2295            for f in files:
2296                os.remove(os.path.join(path, f))
2297            for d in dirs:
2298                os.rmdir(os.path.join(path, d))
2299        os.rmdir(tmpdir)
2300
2301        # Insert the experiment into our state and update the disk copy
2302        self.state_lock.acquire()
2303        self.state[expid]['experimentStatus'] = 'active'
2304        self.state[eid] = self.state[expid]
2305        if self.state_filename: self.write_state()
2306        self.state_lock.release()
2307        return
2308
2309
2310    def new_create_experiment(self, req, fid):
2311        """
2312        The external interface to experiment creation called from the
2313        dispatcher.
2314
2315        Creates a working directory, splits the incoming description using the
2316        splitter script and parses out the avrious subsections using the
2317        lcasses above.  Once each sub-experiment is created, use pooled threads
2318        to instantiate them and start it all up.
2319        """
2320
2321        def add_kit(e, kit):
2322            """
2323            Add a Software object created from the list of (install, location)
2324            tuples passed as kit  to the software attribute of an object e.  We
2325            do this enough to break out the code, but it's kind of a hack to
2326            avoid changing the old tuple rep.
2327            """
2328
2329            s = [ topdl.Software(install=i, location=l) for i, l in kit]
2330
2331            if isinstance(e.software, list): e.software.extend(s)
2332            else: e.software = s
2333
2334
2335        if not self.auth.check_attribute(fid, 'create'):
2336            raise service_error(service_error.access, "Create access denied")
2337
2338        try:
2339            tmpdir = tempfile.mkdtemp(prefix="split-")
2340        except IOError:
2341            raise service_error(service_error.internal, "Cannot create tmp dir")
2342
2343        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2344        gw_secretkey_base = "fed.%s" % self.ssh_type
2345        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2346        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2347        tclfile = tmpdir + "/experiment.tcl"
2348        tbparams = { }
2349        try:
2350            access_user = self.accessdb[fid]
2351        except KeyError:
2352            raise service_error(service_error.internal,
2353                    "Access map and authorizer out of sync in " + \
2354                            "create_experiment for fedid %s"  % fid)
2355
2356        pid = "dummy"
2357        gid = "dummy"
2358        try:
2359            os.mkdir(tmpdir+"/keys")
2360        except OSError:
2361            raise service_error(service_error.internal,
2362                    "Can't make temporary dir")
2363
2364        req = req.get('CreateRequestBody', None)
2365        if not req:
2366            raise service_error(service_error.req,
2367                    "Bad request format (no CreateRequestBody)")
2368        # The tcl parser needs to read a file so put the content into that file
2369        descr=req.get('experimentdescription', None)
2370        if descr:
2371            file_content=descr.get('ns2description', None)
2372            if file_content:
2373                try:
2374                    f = open(tclfile, 'w')
2375                    f.write(file_content)
2376                    f.close()
2377                except IOError:
2378                    raise service_error(service_error.internal,
2379                            "Cannot write temp experiment description")
2380            else:
2381                raise service_error(service_error.req, 
2382                        "Only ns2descriptions supported")
2383        else:
2384            raise service_error(service_error.req, "No experiment description")
2385
2386        # Generate an ID for the experiment (slice) and a certificate that the
2387        # allocator can use to prove they own it.  We'll ship it back through
2388        # the encrypted connection.
2389        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
2390
2391        if req.has_key('experimentID') and \
2392                req['experimentID'].has_key('localname'):
2393            overwrite = False
2394            eid = req['experimentID']['localname']
2395            # If there's an old failed experiment here with the same local name
2396            # and accessible by this user, we'll overwrite it, otherwise we'll
2397            # fall through and do the collision avoidance.
2398            old_expid = self.get_experiment_fedid(eid)
2399            if old_expid and self.check_experiment_access(fid, old_expid):
2400                self.state_lock.acquire()
2401                status = self.state[eid].get('experimentStatus', None)
2402                if status and status == 'failed':
2403                    # remove the old access attribute
2404                    self.auth.unset_attribute(fid, old_expid)
2405                    overwrite = True
2406                    del self.state[eid]
2407                    del self.state[old_expid]
2408                self.state_lock.release()
2409            self.state_lock.acquire()
2410            while (self.state.has_key(eid) and not overwrite):
2411                eid += random.choice(string.ascii_letters)
2412            # Initial state
2413            self.state[eid] = {
2414                    'experimentID' : \
2415                            [ { 'localname' : eid }, {'fedid': expid } ],
2416                    'experimentStatus': 'starting',
2417                    'experimentAccess': { 'X509' : expcert },
2418                    'owner': fid,
2419                    'log' : [],
2420                }
2421            self.state[expid] = self.state[eid]
2422            if self.state_filename: self.write_state()
2423            self.state_lock.release()
2424        else:
2425            eid = self.exp_stem
2426            for i in range(0,5):
2427                eid += random.choice(string.ascii_letters)
2428            self.state_lock.acquire()
2429            while (self.state.has_key(eid)):
2430                eid = self.exp_stem
2431                for i in range(0,5):
2432                    eid += random.choice(string.ascii_letters)
2433            # Initial state
2434            self.state[eid] = {
2435                    'experimentID' : \
2436                            [ { 'localname' : eid }, {'fedid': expid } ],
2437                    'experimentStatus': 'starting',
2438                    'experimentAccess': { 'X509' : expcert },
2439                    'owner': fid,
2440                    'log' : [],
2441                }
2442            self.state[expid] = self.state[eid]
2443            if self.state_filename: self.write_state()
2444            self.state_lock.release()
2445
2446        try: 
2447            # This catches exceptions to clear the placeholder if necessary
2448            try:
2449                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2450            except ValueError:
2451                raise service_error(service_error.server_config, 
2452                        "Bad key type (%s)" % self.ssh_type)
2453
2454            user = req.get('user', None)
2455            if user == None:
2456                raise service_error(service_error.req, "No user")
2457
2458            master = req.get('master', None)
2459            if not master:
2460                raise service_error(service_error.req,
2461                        "No master testbed label")
2462            export_project = req.get('exportProject', None)
2463            if not export_project:
2464                raise service_error(service_error.req, "No export project")
2465           
2466            if self.splitter_url:
2467                self.log.debug("Calling remote splitter at %s" % \
2468                        self.splitter_url)
2469                split_data = self.remote_splitter(self.splitter_url,
2470                        file_content, master)
2471            else:
2472                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2473                    str(self.muxmax), '-m', master]
2474
2475                if self.fedkit:
2476                    tclcmd.append('-k')
2477
2478                if self.gatewaykit:
2479                    tclcmd.append('-K')
2480
2481                tclcmd.extend([pid, gid, eid, tclfile])
2482
2483                self.log.debug("running local splitter %s", " ".join(tclcmd))
2484                # This is just fantastic.  As a side effect the parser copies
2485                # tb_compat.tcl into the current directory, so that directory
2486                # must be writable by the fedd user.  Doing this in the
2487                # temporary subdir ensures this is the case.
2488                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2489                        cwd=tmpdir)
2490                split_data = tclparser.stdout
2491
2492            allocated = { }         # Testbeds we can access
2493            # Allocate IP addresses: The allocator is a buddy system memory
2494            # allocator.  Allocate from the largest substrate to the
2495            # smallest to make the packing more likely to work - i.e.
2496            # avoiding internal fragmentation.
2497            top = topdl.topology_from_xml(file=split_data, top="experiment")
2498            subs = sorted(top.substrates, 
2499                    cmp=lambda x,y: cmp(len(x.interfaces), 
2500                        len(y.interfaces)),
2501                    reverse=True)
2502            ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
2503            ifs = { }
2504            hosts = [ ]
2505            # The config urlpath
2506            configpath = "/%s/config" % expid
2507            # The config file system location
2508            configdir ="%s%s" % ( self.repodir, configpath)
2509
2510            for idx, s in enumerate(subs):
2511                a = ips.allocate(len(s.interfaces)+2)
2512                if a :
2513                    base, num = a
2514                    if num < len(s.interfaces) +2 : 
2515                        raise service_error(service_error.internal,
2516                                "Allocator returned wrong number of IPs??")
2517                else:
2518                    raise service_error(service_error.req, 
2519                            "Cannot allocate IP addresses")
2520
2521                base += 1
2522                for i in s.interfaces:
2523                    i.attribute.append(
2524                            topdl.Attribute('ip4_address', 
2525                                "%s" % ip_addr(base)))
2526                    hname = i.element.name[0]
2527                    if ifs.has_key(hname):
2528                        hosts.append("%s\t%s-%s %s-%d" % \
2529                                (ip_addr(base), hname, s.name, hname,
2530                                    ifs[hname]))
2531                    else:
2532                        ifs[hname] = 0
2533                        hosts.append("%s\t%s-%s %s-%d %s" % \
2534                                (ip_addr(base), hname, s.name, hname,
2535                                    ifs[hname], hname))
2536
2537                    ifs[hname] += 1
2538                    base += 1
2539            # save config files
2540            try:
2541                os.makedirs(configdir)
2542            except IOError, e:
2543                raise service_error(
2544                        "Cannot create config directory: %s" % e)
2545            # Find the testbeds to look up
2546            testbeds = set([ a.value for e in top.elements \
2547                    for a in e.attribute \
2548                        if a.attribute == 'testbed'] )
2549
2550
2551            # Make per testbed topologies.  Copy the main topo and remove
2552            # interfaces and nodes that don't live in the testbed.
2553            topo ={ }
2554            for tb in testbeds:
2555                self.get_access(tb, None, user, tbparams, master,
2556                        export_project, access_user)
2557                allocated[tb] = 1
2558                topo[tb] = top.clone()
2559                to_delete = [ ]
2560                for e in topo[tb].elements:
2561                    etb = e.get_attribute('testbed')
2562                    if etb and etb != tb:
2563                        for i in e.interface:
2564                            for s in i.subs:
2565                                try:
2566                                    s.interfaces.remove(i)
2567                                except ValueError:
2568                                    raise service_error(service_error.internal,
2569                                            "Can't remove interface??")
2570                        to_delete.append(e)
2571                for e in to_delete:
2572                    topo[tb].elements.remove(e)
2573                topo[tb].make_indices()
2574
2575                for e in topo[tb].elements:
2576                    if tb == master:
2577                        cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
2578                    else:
2579                        cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
2580                    scmd = e.get_attribute('startup')
2581                    if scmd:
2582                        cmd = "%s \\$USER '%s'" % (cmd, scmd)
2583
2584                    e.set_attribute('startup', cmd)
2585                    if self.fedkit: add_kit(e, self.fedkit)
2586
2587            # Copy configuration files into the remote file store
2588            try:
2589                f = open("%s/hosts" % configdir, "w")
2590                f.write('\n'.join(hosts))
2591                f.close()
2592            except IOError, e:
2593                raise service_error(service_error.internal, 
2594                        "Cannot write hosts file: %s" % e)
2595            try:
2596                self.copy_file("%s" % gw_pubkey, "%s/%s" % \
2597                        (configdir, gw_pubkey_base))
2598                self.copy_file("%s" % gw_secretkey, "%s/%s" % \
2599                        (configdir, gw_secretkey_base))
2600            except IOError, e:
2601                raise service_error(service_error.internal, 
2602                        "Cannot copy keyfiles: %s" % e)
2603
2604            # Allow the individual testbeds to access the configuration files.
2605            for tb in tbparams.keys():
2606                asignee = tbparams[tb]['allocID']['fedid']
2607                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2608                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2609                    print "assigned %s/%s" % (configpath, f)
2610
2611            # Now, for each substrate in the main topology, find those that
2612            # have nodes on more than one testbed.  Insert portal nodes
2613            # into the copies of those substrates on the sub topologies.
2614            for s in top.substrates:
2615                # tbs will contain an ip address on this subsrate that is in
2616                # each testbed.
2617                tbs = { }
2618                for i in s.interfaces:
2619                    e = i.element
2620                    tb = e.get_attribute('testbed')
2621                    if tb and not tbs.has_key(tb):
2622                        for i in e.interface:
2623                            if s in i.subs:
2624                                tbs[tb]= i.get_attribute('ip4_address')
2625                if len(tbs) < 2:
2626                    continue
2627
2628                # More than one testbed is on this substrate.  Insert
2629                # some portals into the subtopologies.  st == source testbed,
2630                # dt == destination testbed.
2631                segment_substrate = { }
2632                for st in tbs.keys():
2633                    segment_substrate[st] = { }
2634                    for dt in [ t for t in tbs.keys() if t != st]:
2635                        myname =  "%stunnel" % dt
2636                        desthost  =  "%stunnel" % st
2637                        sproject = tbparams[st].get('project', 'project')
2638                        dproject = tbparams[dt].get('project', 'project')
2639                        mproject = tbparams[master].get('project', 'project')
2640                        sdomain = tbparams[st].get('domain', ".example.com")
2641                        ddomain = tbparams[dt].get('domain', ".example.com")
2642                        mdomain = tbparams[master].get('domain', '.example.com')
2643                        muser = tbparams[master].get('user', 'root')
2644                        smbshare = tbparams[master].get('smbshare', 'USERS')
2645                        # XXX: active and type need to be unkludged
2646                        active = ("%s" % (st == master))
2647                        if not segment_substrate[st].has_key(dt):
2648                            # Put a substrate and a segment for the connected
2649                            # testbed in there.
2650                            tsubstrate = \
2651                                    topdl.Substrate(name='%s-%s' % (st, dt),
2652                                            attribute= [
2653                                                topdl.Attribute(
2654                                                    attribute='portal',
2655                                                    value='true')
2656                                                ]
2657                                            )
2658                            segment_element = topdl.Segment(
2659                                    id= tbparams[dt]['allocID'],
2660                                    type='emulab',
2661                                    uri = self.tbmap.get(dt, None),
2662                                    interface=[ 
2663                                        topdl.Interface(
2664                                            substrate=tsubstrate.name),
2665                                        ],
2666                                    attribute = [
2667                                        topdl.Attribute(attribute=n, value=v)
2668                                            for n, v in (\
2669                                                ('domain', ddomain),
2670                                                ('experiment', "%s/%s" % \
2671                                                        (dproject, eid)),)
2672                                        ],
2673                                    )
2674                            segment_substrate[st][dt] = tsubstrate
2675                            topo[st].substrates.append(tsubstrate)
2676                            topo[st].elements.append(segment_element)
2677                        portal = topdl.Computer(
2678                                name="%stunnel" % dt,
2679                                attribute=[ 
2680                                    topdl.Attribute(attribute=n,value=v)
2681                                        for n, v in (\
2682                                            ('portal', 'true'),
2683                                            ('domain', sdomain),
2684                                            ('masterdomain', mdomain),
2685                                            ('masterexperiment', "%s/%s" % \
2686                                                    (mproject, eid)),
2687                                            ('masteruser', muser),
2688                                            ('smbshare', smbshare),
2689                                            ('experiment', "%s/%s" % \
2690                                                    (sproject, eid)),
2691                                            ('peer', "%s" % desthost),
2692                                            ('peer_segment', "%s" % \
2693                                                    tbparams[dt]['allocID']['fedid']),
2694                                            ('scriptdir', 
2695                                                "/usr/local/federation/bin"),
2696                                            ('active', "%s" % active),
2697                                            ('portal_type', 'both'), 
2698                                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), eid.lower(), sproject.lower(), sdomain.lower())))
2699                                    ],
2700                                interface=[
2701                                    topdl.Interface(
2702                                        substrate=s.name,
2703                                        attribute=[ 
2704                                            topdl.Attribute(
2705                                                attribute='ip4_address', 
2706                                                value=tbs[dt]
2707                                            )
2708                                        ]),
2709                                    topdl.Interface(
2710                                        substrate=\
2711                                            segment_substrate[st][dt].name,
2712                                        attribute=[
2713                                            topdl.Attribute(attribute='portal',
2714                                                value='true')
2715                                            ]
2716                                        ),
2717                                    ],
2718                                )
2719                        if self.fedkit: add_kit(portal, self.fedkit)
2720                        if self.gatewaykit: add_kit(portal, self.gatewaykit)
2721
2722                        topo[st].elements.append(portal)
2723
2724            # Connect the gateway nodes into the topologies and clear out
2725            # substrates that are not in the topologies
2726            for tb in testbeds:
2727                topo[tb].incorporate_elements()
2728                topo[tb].substrates = \
2729                        [s for s in topo[tb].substrates \
2730                            if len(s.interfaces) >0]
2731
2732            # Copy the rpms and tarfiles to a distribution directory from
2733            # which the federants can retrieve them
2734            linkpath = "%s/software" %  expid
2735            softdir ="%s/%s" % ( self.repodir, linkpath)
2736            softmap = { }
2737            pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
2738                    for p, t in l ])
2739            pkgs.update([x.location for e in top.elements \
2740                    for x in e.software])
2741            try:
2742                os.makedirs(softdir)
2743            except IOError, e:
2744                raise service_error(
2745                        "Cannot create software directory: %s" % e)
2746            for pkg in pkgs:
2747                loc = pkg
2748
2749                scheme, host, path = urlparse(loc)[0:3]
2750                dest = os.path.basename(path)
2751                if not scheme:
2752                    if not loc.startswith('/'):
2753                        loc = "/%s" % loc
2754                    loc = "file://%s" %loc
2755                try:
2756                    u = urlopen(loc)
2757                except Exception, e:
2758                    raise service_error(service_error.req, 
2759                            "Cannot open %s: %s" % (loc, e))
2760                try:
2761                    f = open("%s/%s" % (softdir, dest) , "w")
2762                    self.log.debug("Writing %s/%s" % (softdir,dest) )
2763                    data = u.read(4096)
2764                    while data:
2765                        f.write(data)
2766                        data = u.read(4096)
2767                    f.close()
2768                    u.close()
2769                except Exception, e:
2770                    raise service_error(service_error.internal,
2771                            "Could not copy %s: %s" % (loc, e))
2772                path = re.sub("/tmp", "", linkpath)
2773                # XXX
2774                softmap[pkg] = \
2775                        "https://users.isi.deterlab.net:23232/%s/%s" %\
2776                        ( path, dest)
2777
2778                # Allow the individual testbeds to access the software.
2779                for tb in tbparams.keys():
2780                    self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
2781                            "/%s/%s" % ( path, dest))
2782
2783            # Convert the software locations in the segments into the local
2784            # copies on this host
2785            for soft in [ s for tb in topo.values() \
2786                    for e in tb.elements \
2787                        if getattr(e, 'software', False) \
2788                            for s in e.software ]:
2789                if softmap.has_key(soft.location):
2790                    soft.location = softmap[soft.location]
2791
2792            vtopo = topdl.topology_to_vtopo(top)
2793            vis = self.genviz(vtopo)
2794
2795            # save federant information
2796            for k in allocated.keys():
2797                tbparams[k]['federant'] = {\
2798                        'name': [ { 'localname' : eid} ],\
2799                        'emulab': tbparams[k]['emulab'],\
2800                        'allocID' : tbparams[k]['allocID'],\
2801                        'master' : k == master,\
2802                    }
2803
2804            self.state_lock.acquire()
2805            self.state[eid]['vtopo'] = vtopo
2806            self.state[eid]['vis'] = vis
2807            self.state[expid]['federant'] = \
2808                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2809                        if tbparams[tb].has_key('federant') ]
2810            if self.state_filename: 
2811                self.write_state()
2812            self.state_lock.release()
2813        except service_error, e:
2814            # If something goes wrong in the parse (usually an access error)
2815            # clear the placeholder state.  From here on out the code delays
2816            # exceptions.  Failing at this point returns a fault to the remote
2817            # caller.
2818
2819            self.state_lock.acquire()
2820            del self.state[eid]
2821            del self.state[expid]
2822            if self.state_filename: self.write_state()
2823            self.state_lock.release()
2824            raise e
2825
2826
2827        # Start the background swapper and return the starting state.  From
2828        # here on out, the state will stick around a while.
2829
2830        # Let users touch the state
2831        self.auth.set_attribute(fid, expid)
2832        self.auth.set_attribute(expid, expid)
2833        # Override fedids can manipulate state as well
2834        for o in self.overrides:
2835            self.auth.set_attribute(o, expid)
2836
2837        # Create a logger that logs to the experiment's state object as well as
2838        # to the main log file.
2839        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2840        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2841        # XXX: there should be a global one of these rather than repeating the
2842        # code.
2843        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2844                    '%d %b %y %H:%M:%S'))
2845        alloc_log.addHandler(h)
2846       
2847        # XXX
2848        url_base = 'https://users.isi.deterlab.net:23232'
2849        attrs = [ 
2850                {
2851                    'attribute': 'ssh_pubkey', 
2852                    'value': '%s/%s/config/%s' % \
2853                            (url_base, expid, gw_pubkey_base)
2854                },
2855                {
2856                    'attribute': 'ssh_secretkey', 
2857                    'value': '%s/%s/config/%s' % \
2858                            (url_base, expid, gw_secretkey_base)
2859                },
2860                {
2861                    'attribute': 'hosts', 
2862                    'value': '%s/%s/config/hosts' % \
2863                            (url_base, expid)
2864                },
2865                {
2866                    'attribute': 'experiment_name',
2867                    'value': eid,
2868                },
2869            ]
2870
2871        # Start a thread to do the resource allocation
2872        t  = Thread(target=self.new_allocate_resources,
2873                args=(allocated, master, eid, expid, expcert, tbparams, 
2874                    topo, tmpdir, alloc_log, attrs),
2875                name=eid)
2876        t.start()
2877
2878        rv = {
2879                'experimentID': [
2880                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2881                ],
2882                'experimentStatus': 'starting',
2883                'experimentAccess': { 'X509' : expcert }
2884            }
2885
2886        return rv
2887   
2888    def get_experiment_fedid(self, key):
2889        """
2890        find the fedid associated with the localname key in the state database.
2891        """
2892
2893        rv = None
2894        self.state_lock.acquire()
2895        if self.state.has_key(key):
2896            if isinstance(self.state[key], dict):
2897                try:
2898                    kl = [ f['fedid'] for f in \
2899                            self.state[key]['experimentID']\
2900                                if f.has_key('fedid') ]
2901                except KeyError:
2902                    self.state_lock.release()
2903                    raise service_error(service_error.internal, 
2904                            "No fedid for experiment %s when getting "+\
2905                                    "fedid(!?)" % key)
2906                if len(kl) == 1:
2907                    rv = kl[0]
2908                else:
2909                    self.state_lock.release()
2910                    raise service_error(service_error.internal, 
2911                            "multiple fedids for experiment %s when " +\
2912                                    "getting fedid(!?)" % key)
2913            else:
2914                self.state_lock.release()
2915                raise service_error(service_error.internal, 
2916                        "Unexpected state for %s" % key)
2917        self.state_lock.release()
2918        return rv
2919
2920    def check_experiment_access(self, fid, key):
2921        """
2922        Confirm that the fid has access to the experiment.  Though a request
2923        may be made in terms of a local name, the access attribute is always
2924        the experiment's fedid.
2925        """
2926        if not isinstance(key, fedid):
2927            key = self.get_experiment_fedid(key)
2928
2929        if self.auth.check_attribute(fid, key):
2930            return True
2931        else:
2932            raise service_error(service_error.access, "Access Denied")
2933
2934
2935    def get_handler(self, path, fid):
2936        print "%s" %  path
2937        if self.auth.check_attribute(fid, path):
2938            return ("%s/%s" % (self.repodir, path), "application/binary")
2939        else:
2940            return (None, None)
2941
2942    def get_vtopo(self, req, fid):
2943        """
2944        Return the stored virtual topology for this experiment
2945        """
2946        rv = None
2947        state = None
2948
2949        req = req.get('VtopoRequestBody', None)
2950        if not req:
2951            raise service_error(service_error.req,
2952                    "Bad request format (no VtopoRequestBody)")
2953        exp = req.get('experiment', None)
2954        if exp:
2955            if exp.has_key('fedid'):
2956                key = exp['fedid']
2957                keytype = "fedid"
2958            elif exp.has_key('localname'):
2959                key = exp['localname']
2960                keytype = "localname"
2961            else:
2962                raise service_error(service_error.req, "Unknown lookup type")
2963        else:
2964            raise service_error(service_error.req, "No request?")
2965
2966        self.check_experiment_access(fid, key)
2967
2968        self.state_lock.acquire()
2969        if self.state.has_key(key):
2970            if self.state[key].has_key('vtopo'):
2971                rv = { 'experiment' : {keytype: key },\
2972                        'vtopo': self.state[key]['vtopo'],\
2973                    }
2974            else:
2975                state = self.state[key]['experimentStatus']
2976        self.state_lock.release()
2977
2978        if rv: return rv
2979        else: 
2980            if state:
2981                raise service_error(service_error.partial, 
2982                        "Not ready: %s" % state)
2983            else:
2984                raise service_error(service_error.req, "No such experiment")
2985
2986    def get_vis(self, req, fid):
2987        """
2988        Return the stored visualization for this experiment
2989        """
2990        rv = None
2991        state = None
2992
2993        req = req.get('VisRequestBody', None)
2994        if not req:
2995            raise service_error(service_error.req,
2996                    "Bad request format (no VisRequestBody)")
2997        exp = req.get('experiment', None)
2998        if exp:
2999            if exp.has_key('fedid'):
3000                key = exp['fedid']
3001                keytype = "fedid"
3002            elif exp.has_key('localname'):
3003                key = exp['localname']
3004                keytype = "localname"
3005            else:
3006                raise service_error(service_error.req, "Unknown lookup type")
3007        else:
3008            raise service_error(service_error.req, "No request?")
3009
3010        self.check_experiment_access(fid, key)
3011
3012        self.state_lock.acquire()
3013        if self.state.has_key(key):
3014            if self.state[key].has_key('vis'):
3015                rv =  { 'experiment' : {keytype: key },\
3016                        'vis': self.state[key]['vis'],\
3017                        }
3018            else:
3019                state = self.state[key]['experimentStatus']
3020        self.state_lock.release()
3021
3022        if rv: return rv
3023        else:
3024            if state:
3025                raise service_error(service_error.partial, 
3026                        "Not ready: %s" % state)
3027            else:
3028                raise service_error(service_error.req, "No such experiment")
3029
3030    def clean_info_response(self, rv):
3031        """
3032        Remove the information in the experiment's state object that is not in
3033        the info response.
3034        """
3035        # Remove the owner info (should always be there, but...)
3036        if rv.has_key('owner'): del rv['owner']
3037
3038        # Convert the log into the allocationLog parameter and remove the
3039        # log entry (with defensive programming)
3040        if rv.has_key('log'):
3041            rv['allocationLog'] = "".join(rv['log'])
3042            del rv['log']
3043        else:
3044            rv['allocationLog'] = ""
3045
3046        if rv['experimentStatus'] != 'active':
3047            if rv.has_key('federant'): del rv['federant']
3048        else:
3049            # remove the allocationID info from each federant
3050            for f in rv.get('federant', []):
3051                if f.has_key('allocID'): del f['allocID']
3052        return rv
3053
3054    def get_info(self, req, fid):
3055        """
3056        Return all the stored info about this experiment
3057        """
3058        rv = None
3059
3060        req = req.get('InfoRequestBody', None)
3061        if not req:
3062            raise service_error(service_error.req,
3063                    "Bad request format (no InfoRequestBody)")
3064        exp = req.get('experiment', None)
3065        if exp:
3066            if exp.has_key('fedid'):
3067                key = exp['fedid']
3068                keytype = "fedid"
3069            elif exp.has_key('localname'):
3070                key = exp['localname']
3071                keytype = "localname"
3072            else:
3073                raise service_error(service_error.req, "Unknown lookup type")
3074        else:
3075            raise service_error(service_error.req, "No request?")
3076
3077        self.check_experiment_access(fid, key)
3078
3079        # The state may be massaged by the service function that called
3080        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
3081        # state.
3082        self.state_lock.acquire()
3083        if self.state.has_key(key):
3084            rv = copy.deepcopy(self.state[key])
3085        self.state_lock.release()
3086
3087        if rv:
3088            return self.clean_info_response(rv)
3089        else:
3090            raise service_error(service_error.req, "No such experiment")
3091
3092    def get_multi_info(self, req, fid):
3093        """
3094        Return all the stored info that this fedid can access
3095        """
3096        rv = { 'info': [ ] }
3097
3098        self.state_lock.acquire()
3099        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
3100            self.check_experiment_access(fid, key)
3101
3102            if self.state.has_key(key):
3103                e = copy.deepcopy(self.state[key])
3104                e = self.clean_info_response(e)
3105                rv['info'].append(e)
3106        self.state_lock.release()
3107        return rv
3108
3109
3110    def terminate_experiment(self, req, fid):
3111        """
3112        Swap this experiment out on the federants and delete the shared
3113        information
3114        """
3115        tbparams = { }
3116        req = req.get('TerminateRequestBody', None)
3117        if not req:
3118            raise service_error(service_error.req,
3119                    "Bad request format (no TerminateRequestBody)")
3120        force = req.get('force', False)
3121        exp = req.get('experiment', None)
3122        if exp:
3123            if exp.has_key('fedid'):
3124                key = exp['fedid']
3125                keytype = "fedid"
3126            elif exp.has_key('localname'):
3127                key = exp['localname']
3128                keytype = "localname"
3129            else:
3130                raise service_error(service_error.req, "Unknown lookup type")
3131        else:
3132            raise service_error(service_error.req, "No request?")
3133
3134        self.check_experiment_access(fid, key)
3135
3136        dealloc_list = [ ]
3137
3138
3139        # Create a logger that logs to the dealloc_list as well as to the main
3140        # log file.
3141        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
3142        h = logging.StreamHandler(self.list_log(dealloc_list))
3143        # XXX: there should be a global one of these rather than repeating the
3144        # code.
3145        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
3146                    '%d %b %y %H:%M:%S'))
3147        dealloc_log.addHandler(h)
3148
3149        self.state_lock.acquire()
3150        fed_exp = self.state.get(key, None)
3151
3152        if fed_exp:
3153            # This branch of the conditional holds the lock to generate a
3154            # consistent temporary tbparams variable to deallocate experiments.
3155            # It releases the lock to do the deallocations and reacquires it to
3156            # remove the experiment state when the termination is complete.
3157
3158            # First make sure that the experiment creation is complete.
3159            status = fed_exp.get('experimentStatus', None)
3160
3161            if status:
3162                if status in ('starting', 'terminating'):
3163                    if not force:
3164                        self.state_lock.release()
3165                        raise service_error(service_error.partial, 
3166                                'Experiment still being created or destroyed')
3167                    else:
3168                        self.log.warning('Experiment in %s state ' % status + \
3169                                'being terminated by force.')
3170            else:
3171                # No status??? trouble
3172                self.state_lock.release()
3173                raise service_error(service_error.internal,
3174                        "Experiment has no status!?")
3175
3176            ids = []
3177            #  experimentID is a list of dicts that are self-describing
3178            #  identifiers.  This finds all the fedids and localnames - the
3179            #  keys of self.state - and puts them into ids.
3180            for id in fed_exp.get('experimentID', []):
3181                if id.has_key('fedid'): ids.append(id['fedid'])
3182                if id.has_key('localname'): ids.append(id['localname'])
3183
3184            # Construct enough of the tbparams to make the stop_segment calls
3185            # work
3186            for fed in fed_exp.get('federant', []):
3187                try:
3188                    for e in fed['name']:
3189                        eid = e.get('localname', None)
3190                        if eid: break
3191                    else:
3192                        continue
3193
3194                    p = fed['emulab']['project']
3195
3196                    project = p['name']['localname']
3197                    tb = p['testbed']['localname']
3198                    user = p['user'][0]['userID']['localname']
3199
3200                    domain = fed['emulab']['domain']
3201                    host  = fed['emulab']['ops']
3202                    aid = fed['allocID']
3203                except KeyError, e:
3204                    continue
3205                tbparams[tb] = {\
3206                        'user': user,\
3207                        'domain': domain,\
3208                        'project': project,\
3209                        'host': host,\
3210                        'eid': eid,\
3211                        'aid': aid,\
3212                    }
3213            fed_exp['experimentStatus'] = 'terminating'
3214            if self.state_filename: self.write_state()
3215            self.state_lock.release()
3216
3217            # Stop everyone.  NB, wait_for_all waits until a thread starts and
3218            # then completes, so we can't wait if nothing starts.  So, no
3219            # tbparams, no start.
3220            if len(tbparams) > 0:
3221                thread_pool = self.thread_pool(self.nthreads)
3222                for tb in tbparams.keys():
3223                    # Create and start a thread to stop the segment
3224                    thread_pool.wait_for_slot()
3225                    t  = self.pooled_thread(\
3226                            target=self.stop_segment(log=dealloc_log,
3227                                keyfile=self.ssh_privkey_file, debug=self.debug), 
3228                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
3229                            pdata=thread_pool, trace_file=self.trace_file)
3230                    t.start()
3231                # Wait for completions
3232                thread_pool.wait_for_all_done()
3233
3234            # release the allocations (failed experiments have done this
3235            # already, and starting experiments may be in odd states, so we
3236            # ignore errors releasing those allocations
3237            try: 
3238                for tb in tbparams.keys():
3239                    self.release_access(tb, tbparams[tb]['aid'])
3240            except service_error, e:
3241                if status != 'failed' and not force:
3242                    raise e
3243
3244            # Remove the terminated experiment
3245            self.state_lock.acquire()
3246            for id in ids:
3247                if self.state.has_key(id): del self.state[id]
3248
3249            if self.state_filename: self.write_state()
3250            self.state_lock.release()
3251
3252            return { 
3253                    'experiment': exp , 
3254                    'deallocationLog': "".join(dealloc_list),
3255                    }
3256        else:
3257            # Don't forget to release the lock
3258            self.state_lock.release()
3259            raise service_error(service_error.req, "No saved state")
3260
3261    def new_terminate_experiment(self, req, fid):
3262        """
3263        Swap this experiment out on the federants and delete the shared
3264        information
3265        """
3266        tbparams = { }
3267        req = req.get('TerminateRequestBody', None)
3268        if not req:
3269            raise service_error(service_error.req,
3270                    "Bad request format (no TerminateRequestBody)")
3271        force = req.get('force', False)
3272        exp = req.get('experiment', None)
3273        if exp:
3274            if exp.has_key('fedid'):
3275                key = exp['fedid']
3276                keytype = "fedid"
3277            elif exp.has_key('localname'):
3278                key = exp['localname']
3279                keytype = "localname"
3280            else:
3281                raise service_error(service_error.req, "Unknown lookup type")
3282        else:
3283            raise service_error(service_error.req, "No request?")
3284
3285        self.check_experiment_access(fid, key)
3286
3287        dealloc_list = [ ]
3288
3289
3290        # Create a logger that logs to the dealloc_list as well as to the main
3291        # log file.
3292        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
3293        h = logging.StreamHandler(self.list_log(dealloc_list))
3294        # XXX: there should be a global one of these rather than repeating the
3295        # code.
3296        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
3297                    '%d %b %y %H:%M:%S'))
3298        dealloc_log.addHandler(h)
3299
3300        self.state_lock.acquire()
3301        fed_exp = self.state.get(key, None)
3302
3303        if fed_exp:
3304            # This branch of the conditional holds the lock to generate a
3305            # consistent temporary tbparams variable to deallocate experiments.
3306            # It releases the lock to do the deallocations and reacquires it to
3307            # remove the experiment state when the termination is complete.
3308
3309            # First make sure that the experiment creation is complete.
3310            status = fed_exp.get('experimentStatus', None)
3311
3312            if status:
3313                if status in ('starting', 'terminating'):
3314                    if not force:
3315                        self.state_lock.release()
3316                        raise service_error(service_error.partial, 
3317                                'Experiment still being created or destroyed')
3318                    else:
3319                        self.log.warning('Experiment in %s state ' % status + \
3320                                'being terminated by force.')
3321            else:
3322                # No status??? trouble
3323                self.state_lock.release()
3324                raise service_error(service_error.internal,
3325                        "Experiment has no status!?")
3326
3327            ids = []
3328            #  experimentID is a list of dicts that are self-describing
3329            #  identifiers.  This finds all the fedids and localnames - the
3330            #  keys of self.state - and puts them into ids.
3331            for id in fed_exp.get('experimentID', []):
3332                if id.has_key('fedid'): ids.append(id['fedid'])
3333                if id.has_key('localname'): ids.append(id['localname'])
3334
3335            # Collect the allocation/segment ids
3336            for fed in fed_exp.get('federant', []):
3337                try:
3338                    print "looking at %s" % fed
3339                    tb = fed['emulab']['project']['testbed']['localname']
3340                    aid = fed['allocID']
3341                except KeyError, e:
3342                    print "Key error: %s" %e
3343                    continue
3344                tbparams[tb] = aid
3345            fed_exp['experimentStatus'] = 'terminating'
3346            if self.state_filename: self.write_state()
3347            self.state_lock.release()
3348
3349            # Stop everyone.  NB, wait_for_all waits until a thread starts and
3350            # then completes, so we can't wait if nothing starts.  So, no
3351            # tbparams, no start.
3352            if len(tbparams) > 0:
3353                thread_pool = self.thread_pool(self.nthreads)
3354                for tb in tbparams.keys():
3355                    # Create and start a thread to stop the segment
3356                    thread_pool.wait_for_slot()
3357                    uri = self.tbmap.get(tb, None)
3358                    t  = self.pooled_thread(\
3359                            target=self.new_terminate_segment(log=dealloc_log,
3360                                cert_file=self.cert_file, 
3361                                cert_pwd=self.cert_pwd,
3362                                trusted_certs=self.trusted_certs,
3363                                caller=self.call_TerminateSegment),
3364                            args=(uri, tbparams[tb]), name=tb,
3365                            pdata=thread_pool, trace_file=self.trace_file)
3366                    t.start()
3367                # Wait for completions
3368                thread_pool.wait_for_all_done()
3369
3370            # release the allocations (failed experiments have done this
3371            # already, and starting experiments may be in odd states, so we
3372            # ignore errors releasing those allocations
3373            try: 
3374                for tb in tbparams.keys():
3375                    self.release_access(tb, tbparams[tb])
3376            except service_error, e:
3377                if status != 'failed' and not force:
3378                    raise e
3379
3380            # Remove the terminated experiment
3381            self.state_lock.acquire()
3382            for id in ids:
3383                if self.state.has_key(id): del self.state[id]
3384
3385            if self.state_filename: self.write_state()
3386            self.state_lock.release()
3387
3388            return { 
3389                    'experiment': exp , 
3390                    'deallocationLog': "".join(dealloc_list),
3391                    }
3392        else:
3393            # Don't forget to release the lock
3394            self.state_lock.release()
3395            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.