source: fedd/federation/experiment_control.py @ fd556d1

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since fd556d1 was fd556d1, checked in by Ted Faber <faber@…>, 15 years ago

Properly communicate errors

  • Property mode set to 100644
File size: 130.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32from ip_allocator import ip_allocator
33from ip_addr import ip_addr
34
35
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.experiment_control")
40fl.addHandler(nullHandler())
41
42class experiment_control_local:
43    """
44    Control of experiments that this system can directly access.
45
46    Includes experiment creation, termination and information dissemination.
47    Thred safe.
48    """
49
50    class ssh_cmd_timeout(RuntimeError): pass
51
52    class list_log:
53        """
54        Provide an interface that lets logger.StreamHandler s write to a list
55        of strings.
56        """
57        def __init__(self, l=[]):
58            """
59            Link to an existing list or just create a log
60            """
61            self.ll = l
62            self.lock = Lock()
63        def write(self, str):
64            """
65            Add the string to the log.  Lock for consistency.
66            """
67            self.lock.acquire()
68            self.ll.append(str)
69            self.lock.release()
70
71        def flush(self):
72            """
73            No-op that StreamHandlers expect
74            """
75            pass
76
77   
78    class thread_pool:
79        """
80        A class to keep track of a set of threads all invoked for the same
81        task.  Manages the mutual exclusion of the states.
82        """
83        def __init__(self, nthreads):
84            """
85            Start a pool.
86            """
87            self.changed = Condition()
88            self.started = 0
89            self.terminated = 0
90            self.nthreads = nthreads
91
92        def acquire(self):
93            """
94            Get the pool's lock.
95            """
96            self.changed.acquire()
97
98        def release(self):
99            """
100            Release the pool's lock.
101            """
102            self.changed.release()
103
104        def wait(self, timeout = None):
105            """
106            Wait for a pool thread to start or stop.
107            """
108            self.changed.wait(timeout)
109
110        def start(self):
111            """
112            Called by a pool thread to report starting.
113            """
114            self.changed.acquire()
115            self.started += 1
116            self.changed.notifyAll()
117            self.changed.release()
118
119        def terminate(self):
120            """
121            Called by a pool thread to report finishing.
122            """
123            self.changed.acquire()
124            self.terminated += 1
125            self.changed.notifyAll()
126            self.changed.release()
127
128        def clear(self):
129            """
130            Clear all pool data.
131            """
132            self.changed.acquire()
133            self.started = 0
134            self.terminated =0
135            self.changed.notifyAll()
136            self.changed.release()
137
138        def wait_for_slot(self):
139            """
140            Wait until we have a free slot to start another pooled thread
141            """
142            self.acquire()
143            while self.started - self.terminated >= self.nthreads:
144                self.wait()
145            self.release()
146
147        def wait_for_all_done(self):
148            """
149            Wait until all active threads finish (and at least one has started)
150            """
151            self.acquire()
152            while self.started == 0 or self.started > self.terminated:
153                self.wait()
154            self.release()
155
156    class pooled_thread(Thread):
157        """
158        One of a set of threads dedicated to a specific task.  Uses the
159        thread_pool class above for coordination.
160        """
161        def __init__(self, group=None, target=None, name=None, args=(), 
162                kwargs={}, pdata=None, trace_file=None):
163            Thread.__init__(self, group, target, name, args, kwargs)
164            self.rv = None          # Return value of the ops in this thread
165            self.exception = None   # Exception that terminated this thread
166            self.target=target      # Target function to run on start()
167            self.args = args        # Args to pass to target
168            self.kwargs = kwargs    # Additional kw args
169            self.pdata = pdata      # thread_pool for this class
170            # Logger for this thread
171            self.log = logging.getLogger("fedd.experiment_control")
172       
173        def run(self):
174            """
175            Emulate Thread.run, except add pool data manipulation and error
176            logging.
177            """
178            if self.pdata:
179                self.pdata.start()
180
181            if self.target:
182                try:
183                    self.rv = self.target(*self.args, **self.kwargs)
184                except service_error, s:
185                    self.exception = s
186                    self.log.error("Thread exception: %s %s" % \
187                            (s.code_string(), s.desc))
188                except:
189                    self.exception = sys.exc_info()[1]
190                    self.log.error(("Unexpected thread exception: %s" +\
191                            "Trace %s") % (self.exception,\
192                                traceback.format_exc()))
193            if self.pdata:
194                self.pdata.terminate()
195
196    call_RequestAccess = service_caller('RequestAccess')
197    call_ReleaseAccess = service_caller('ReleaseAccess')
198    call_StartSegment = service_caller('StartSegment')
199    call_TerminateSegment = service_caller('TerminateSegment')
200    call_Ns2Split = service_caller('Ns2Split')
201
202    def __init__(self, config=None, auth=None):
203        """
204        Intialize the various attributes, most from the config object
205        """
206
207        def parse_tarfile_list(tf):
208            """
209            Parse a tarfile list from the configuration.  This is a set of
210            paths and tarfiles separated by spaces.
211            """
212            rv = [ ]
213            if tf is not None:
214                tl = tf.split()
215                while len(tl) > 1:
216                    p, t = tl[0:2]
217                    del tl[0:2]
218                    rv.append((p, t))
219            return rv
220
221        self.thread_with_rv = experiment_control_local.pooled_thread
222        self.thread_pool = experiment_control_local.thread_pool
223        self.list_log = experiment_control_local.list_log
224
225        self.cert_file = config.get("experiment_control", "cert_file")
226        if self.cert_file:
227            self.cert_pwd = config.get("experiment_control", "cert_pwd")
228        else:
229            self.cert_file = config.get("globals", "cert_file")
230            self.cert_pwd = config.get("globals", "cert_pwd")
231
232        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
233                or config.get("globals", "trusted_certs")
234
235        self.repodir = config.get("experiment_control", "repodir")
236
237        self.exp_stem = "fed-stem"
238        self.log = logging.getLogger("fedd.experiment_control")
239        set_log_level(config, "experiment_control", self.log)
240        self.muxmax = 2
241        self.nthreads = 2
242        self.randomize_experiments = False
243
244        self.splitter = None
245        self.ssh_keygen = "/usr/bin/ssh-keygen"
246        self.ssh_identity_file = None
247
248
249        self.debug = config.getboolean("experiment_control", "create_debug")
250        self.state_filename = config.get("experiment_control", 
251                "experiment_state")
252        self.splitter_url = config.get("experiment_control", "splitter_uri")
253        self.fedkit = parse_tarfile_list(\
254                config.get("experiment_control", "fedkit"))
255        self.gatewaykit = parse_tarfile_list(\
256                config.get("experiment_control", "gatewaykit"))
257        accessdb_file = config.get("experiment_control", "accessdb")
258
259        self.ssh_pubkey_file = config.get("experiment_control", 
260                "ssh_pubkey_file")
261        self.ssh_privkey_file = config.get("experiment_control",
262                "ssh_privkey_file")
263        # NB for internal master/slave ops, not experiment setup
264        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
265
266        self.overrides = set([])
267        ovr = config.get('experiment_control', 'overrides')
268        if ovr:
269            for o in ovr.split(","):
270                o = o.strip()
271                if o.startswith('fedid:'): o = o[len('fedid:'):]
272                self.overrides.add(fedid(hexstr=o))
273
274        self.state = { }
275        self.state_lock = Lock()
276        self.tclsh = "/usr/local/bin/otclsh"
277        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
278                config.get("experiment_control", "tcl_splitter",
279                        "/usr/testbed/lib/ns2ir/parse.tcl")
280        mapdb_file = config.get("experiment_control", "mapdb")
281        self.trace_file = sys.stderr
282
283        self.def_expstart = \
284                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
285                "/tmp/federate";
286        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
287                "FEDDIR/hosts";
288        self.def_gwstart = \
289                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
290                "/tmp/bridge.log";
291        self.def_mgwstart = \
292                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
293                "/tmp/bridge.log";
294        self.def_gwimage = "FBSD61-TUNNEL2";
295        self.def_gwtype = "pc";
296        self.local_access = { }
297
298        if auth:
299            self.auth = auth
300        else:
301            self.log.error(\
302                    "[access]: No authorizer initialized, creating local one.")
303            auth = authorizer()
304
305
306        if self.ssh_pubkey_file:
307            try:
308                f = open(self.ssh_pubkey_file, 'r')
309                self.ssh_pubkey = f.read()
310                f.close()
311            except IOError:
312                raise service_error(service_error.internal,
313                        "Cannot read sshpubkey")
314        else:
315            raise service_error(service_error.internal, 
316                    "No SSH public key file?")
317
318        if not self.ssh_privkey_file:
319            raise service_error(service_error.internal, 
320                    "No SSH public key file?")
321
322
323        if mapdb_file:
324            self.read_mapdb(mapdb_file)
325        else:
326            self.log.warn("[experiment_control] No testbed map, using defaults")
327            self.tbmap = { 
328                    'deter':'https://users.isi.deterlab.net:23235',
329                    'emulab':'https://users.isi.deterlab.net:23236',
330                    'ucb':'https://users.isi.deterlab.net:23237',
331                    }
332
333        if accessdb_file:
334                self.read_accessdb(accessdb_file)
335        else:
336            raise service_error(service_error.internal,
337                    "No accessdb specified in config")
338
339        # Grab saved state.  OK to do this w/o locking because it's read only
340        # and only one thread should be in existence that can see self.state at
341        # this point.
342        if self.state_filename:
343            self.read_state()
344
345        # Dispatch tables
346        self.soap_services = {\
347                'Create': soap_handler('Create', self.new_create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.new_terminate_experiment),
354        }
355
356        self.xmlrpc_services = {\
357                'Create': xmlrpc_handler('Create', self.new_create_experiment),
358                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
359                'Vis': xmlrpc_handler('Vis', self.get_vis),
360                'Info': xmlrpc_handler('Info', self.get_info),
361                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
362                'Terminate': xmlrpc_handler('Terminate',
363                    self.new_terminate_experiment),
364        }
365
366    def copy_file(self, src, dest, size=1024):
367        """
368        Exceedingly simple file copy.
369        """
370        s = open(src,'r')
371        d = open(dest, 'w')
372
373        buf = "x"
374        while buf != "":
375            buf = s.read(size)
376            d.write(buf)
377        s.close()
378        d.close()
379
380    # Call while holding self.state_lock
381    def write_state(self):
382        """
383        Write a new copy of experiment state after copying the existing state
384        to a backup.
385
386        State format is a simple pickling of the state dictionary.
387        """
388        if os.access(self.state_filename, os.W_OK):
389            self.copy_file(self.state_filename, \
390                    "%s.bak" % self.state_filename)
391        try:
392            f = open(self.state_filename, 'w')
393            pickle.dump(self.state, f)
394        except IOError, e:
395            self.log.error("Can't write file %s: %s" % \
396                    (self.state_filename, e))
397        except pickle.PicklingError, e:
398            self.log.error("Pickling problem: %s" % e)
399        except TypeError, e:
400            self.log.error("Pickling problem (TypeError): %s" % e)
401
402    # Call while holding self.state_lock
403    def read_state(self):
404        """
405        Read a new copy of experiment state.  Old state is overwritten.
406
407        State format is a simple pickling of the state dictionary.
408        """
409       
410        def get_experiment_id(state):
411            """
412            Pull the fedid experimentID out of the saved state.  This is kind
413            of a gross walk through the dict.
414            """
415
416            if state.has_key('experimentID'):
417                for e in state['experimentID']:
418                    if e.has_key('fedid'):
419                        return e['fedid']
420                else:
421                    return None
422            else:
423                return None
424
425        def get_alloc_ids(state):
426            """
427            Pull the fedids of the identifiers of each allocation from the
428            state.  Again, a dict dive that's best isolated.
429            """
430
431            return [ f['allocID']['fedid'] 
432                    for f in state.get('federant',[]) \
433                        if f.has_key('allocID') and \
434                            f['allocID'].has_key('fedid')]
435
436
437        try:
438            f = open(self.state_filename, "r")
439            self.state = pickle.load(f)
440            self.log.debug("[read_state]: Read state from %s" % \
441                    self.state_filename)
442        except IOError, e:
443            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
444                    % (self.state_filename, e))
445        except pickle.UnpicklingError, e:
446            self.log.warning(("[read_state]: No saved state: " + \
447                    "Unpickling failed: %s") % e)
448       
449        for s in self.state.values():
450            try:
451
452                eid = get_experiment_id(s)
453                if eid : 
454                    # Give the owner rights to the experiment
455                    self.auth.set_attribute(s['owner'], eid)
456                    # And holders of the eid as well
457                    self.auth.set_attribute(eid, eid)
458                    # allow overrides to control experiments as well
459                    for o in self.overrides:
460                        self.auth.set_attribute(o, eid)
461                    # Set permissions to allow reading of the software repo, if
462                    # any, as well.
463                    for a in get_alloc_ids(s):
464                        self.auth.set_attribute(a, 'repo/%s' % eid)
465                else:
466                    raise KeyError("No experiment id")
467            except KeyError, e:
468                self.log.warning("[read_state]: State ownership or identity " +\
469                        "misformatted in %s: %s" % (self.state_filename, e))
470
471
472    def read_accessdb(self, accessdb_file):
473        """
474        Read the mapping from fedids that can create experiments to their name
475        in the 3-level access namespace.  All will be asserted from this
476        testbed and can include the local username and porject that will be
477        asserted on their behalf by this fedd.  Each fedid is also added to the
478        authorization system with the "create" attribute.
479        """
480        self.accessdb = {}
481        # These are the regexps for parsing the db
482        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
483        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
484                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
485        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
486                "\s*->\s*(" + name_expr + ")\s*$")
487        lineno = 0
488
489        # Parse the mappings and store in self.authdb, a dict of
490        # fedid -> (proj, user)
491        try:
492            f = open(accessdb_file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if len(line) == 0 or line.startswith('#'):
497                    continue
498                m = project_line.match(line)
499                if m:
500                    fid = fedid(hexstr=m.group(1))
501                    project, user = m.group(2,3)
502                    if not self.accessdb.has_key(fid):
503                        self.accessdb[fid] = []
504                    self.accessdb[fid].append((project, user))
505                    continue
506
507                m = user_line.match(line)
508                if m:
509                    fid = fedid(hexstr=m.group(1))
510                    project = None
511                    user = m.group(2)
512                    if not self.accessdb.has_key(fid):
513                        self.accessdb[fid] = []
514                    self.accessdb[fid].append((project, user))
515                    continue
516                self.log.warn("[experiment_control] Error parsing access " +\
517                        "db %s at line %d" %  (accessdb_file, lineno))
518        except IOError:
519            raise service_error(service_error.internal,
520                    "Error opening/reading %s as experiment " +\
521                            "control accessdb" %  accessdb_file)
522        f.close()
523
524        # Initialize the authorization attributes
525        for fid in self.accessdb.keys():
526            self.auth.set_attribute(fid, 'create')
527
528    def read_mapdb(self, file):
529        """
530        Read a simple colon separated list of mappings for the
531        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
532        """
533
534        self.tbmap = { }
535        lineno =0
536        try:
537            f = open(file, "r")
538            for line in f:
539                lineno += 1
540                line = line.strip()
541                if line.startswith('#') or len(line) == 0:
542                    continue
543                try:
544                    label, url = line.split(':', 1)
545                    self.tbmap[label] = url
546                except ValueError, e:
547                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
548                            "map db: %s %s" % (lineno, line, e))
549        except IOError, e:
550            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
551                    "open %s: %s" % (file, e))
552        f.close()
553
554    class emulab_segment:
555        def __init__(self, log=None, keyfile=None, debug=False):
556            self.log = log or logging.getLogger(\
557                    'fedd.experiment_control.emulab_segment')
558            self.ssh_privkey_file = keyfile
559            self.debug = debug
560            self.ssh_exec="/usr/bin/ssh"
561            self.scp_exec = "/usr/bin/scp"
562            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
563
564        def scp_file(self, file, user, host, dest=""):
565            """
566            scp a file to the remote host.  If debug is set the action is only
567            logged.
568            """
569
570            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
571                    '-o', 'StrictHostKeyChecking yes', '-i', 
572                    self.ssh_privkey_file, file, 
573                    "%s@%s:%s" % (user, host, dest)]
574            rv = 0
575
576            try:
577                dnull = open("/dev/null", "w")
578            except IOError:
579                self.log.debug("[ssh_file]: failed to open " + \
580                        "/dev/null for redirect")
581                dnull = Null
582
583            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
584            if not self.debug:
585                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
586                        close_fds=True)
587
588            return rv == 0
589
590        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
591            """
592            Run a remote command on host as user.  If debug is set, the action
593            is only logged.  Commands are run without stdin, to avoid stray
594            SIGTTINs.
595            """
596            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
597                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
598                    (self.ssh_exec, self.ssh_privkey_file, 
599                            user, host, cmd)
600
601            try:
602                dnull = open("/dev/null", "w")
603            except IOError:
604                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
605                        "for redirect")
606                dnull = Null
607
608            self.log.debug("[ssh_cmd]: %s" % sh_str)
609            if not self.debug:
610                if dnull:
611                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
612                            close_fds=True)
613                else:
614                    sub = Popen(sh_str, shell=True,
615                            close_fds=True)
616                if timeout:
617                    i = 0
618                    rv = sub.poll()
619                    while i < timeout:
620                        if rv is not None: break
621                        else:
622                            time.sleep(1)
623                            rv = sub.poll()
624                            i += 1
625                    else:
626                        self.log.debug("Process exceeded runtime: %s" % sh_str)
627                        os.kill(sub.pid, signal.SIGKILL)
628                        raise self.ssh_cmd_timeout();
629                    return rv == 0
630                else:
631                    return sub.wait() == 0
632            else:
633                if timeout == 0:
634                    self.log.debug("debug timeout raised on %s " % sh_str)
635                    raise self.ssh_cmd_timeout()
636                else:
637                    return True
638
639    class start_segment(emulab_segment):
640        def __init__(self, log=None, keyfile=None, debug=False):
641            experiment_control_local.emulab_segment.__init__(self,
642                    log=log, keyfile=keyfile, debug=debug)
643
644        def create_config_tree(self, src_dir, dest_dir, script):
645            """
646            Append commands to script that will create the directory hierarchy
647            on the remote federant.
648            """
649
650            if os.path.isdir(src_dir):
651                print >>script, "mkdir -p %s" % dest_dir
652                print >>script, "chmod 770 %s" % dest_dir
653
654                for f in os.listdir(src_dir):
655                    if os.path.isdir(f):
656                        self.create_config_tree("%s/%s" % (src_dir, f), 
657                                "%s/%s" % (dest_dir, f), script)
658            else:
659                self.log.debug("[create_config_tree]: Not a directory: %s" \
660                        % src_dir)
661
662        def ship_configs(self, host, user, src_dir, dest_dir):
663            """
664            Copy federant-specific configuration files to the federant.
665            """
666            for f in os.listdir(src_dir):
667                if os.path.isdir(f):
668                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
669                            "%s/%s" % (dest_dir, f)):
670                        return False
671                else:
672                    if not self.scp_file("%s/%s" % (src_dir, f), 
673                            user, host, dest_dir):
674                        return False
675            return True
676
677        def get_state(self, user, host, tb, pid, eid):
678            # command to test experiment state
679            expinfo_exec = "/usr/testbed/bin/expinfo" 
680            # Regular expressions to parse the expinfo response
681            state_re = re.compile("State:\s+(\w+)")
682            no_exp_re = re.compile("^No\s+such\s+experiment")
683            swapping_re = re.compile("^No\s+information\s+available.")
684            state = None    # Experiment state parsed from expinfo
685            # The expinfo ssh command.  Note the identity restriction to use
686            # only the identity provided in the pubkey given.
687            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
688                    'StrictHostKeyChecking yes', '-i', 
689                    self.ssh_privkey_file, "%s@%s" % (user, host), 
690                    expinfo_exec, pid, eid]
691
692            dev_null = None
693            try:
694                dev_null = open("/dev/null", "a")
695            except IOError, e:
696                self.log.error("[get_state]: can't open /dev/null: %s" %e)
697
698            if self.debug:
699                state = 'swapped'
700                rv = 0
701            else:
702                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
703                        close_fds=True)
704                for line in status.stdout:
705                    m = state_re.match(line)
706                    if m: state = m.group(1)
707                    else:
708                        for reg, st in ((no_exp_re, "none"),
709                                (swapping_re, "swapping")):
710                            m = reg.match(line)
711                            if m: state = st
712                rv = status.wait()
713
714            # If the experiment is not present the subcommand returns a
715            # non-zero return value.  If we successfully parsed a "none"
716            # outcome, ignore the return code.
717            if rv != 0 and state != 'none':
718                raise service_error(service_error.internal,
719                        "Cannot get status of segment %s:%s/%s" % \
720                                (tb, pid, eid))
721            elif state not in ('active', 'swapped', 'swapping', 'none'):
722                raise service_error(service_error.internal,
723                        "Cannot get status of segment %s:%s/%s" % \
724                                (tb, pid, eid))
725            else: return state
726
727
728        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
729            """
730            Start a sub-experiment on a federant.
731
732            Get the current state, modify or create as appropriate, ship data
733            and configs and start the experiment.  There are small ordering
734            differences based on the initial state of the sub-experiment.
735            """
736            # ops node in the federant
737            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
738            user = tbparams[tb]['user']     # federant user
739            pid = tbparams[tb]['project']   # federant project
740            # XXX
741            base_confs = ( "hosts",)
742            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
743            # Configuration directories on the remote machine
744            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
745            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
746            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
747
748            state = self.get_state(user, host, tb, pid, eid)
749
750            self.log.debug("[start_segment]: %s: %s" % (tb, state))
751            self.log.info("[start_segment]:transferring experiment to %s" % tb)
752
753            if not self.scp_file("%s/%s/%s" % \
754                    (tmpdir, tb, tclfile), user, host):
755                return False
756           
757            if state == 'none':
758                # Create a null copy of the experiment so that we capture any
759                # logs there if the modify fails.  Emulab software discards the
760                # logs from a failed startexp
761                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
762                    return False
763                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
764                timedout = False
765                try:
766                    if not self.ssh_cmd(user, host,
767                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
768                            "-e %s null.tcl") % (pid, eid), "startexp",
769                            timeout=60 * 10):
770                        return False
771                except self.ssh_cmd_timeout:
772                    timedout = True
773
774                if timedout:
775                    state = self.get_state(user, host, tb, pid, eid)
776                    if state != "swapped":
777                        return False
778
779           
780            # Open up a temporary file to contain a script for setting up the
781            # filespace for the new experiment.
782            self.log.info("[start_segment]: creating script file")
783            try:
784                sf, scriptname = tempfile.mkstemp()
785                scriptfile = os.fdopen(sf, 'w')
786            except IOError:
787                return False
788
789            scriptbase = os.path.basename(scriptname)
790
791            # Script the filesystem changes
792            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
793            # Clear and create the tarfiles and rpm directories
794            for d in (tarfiles_dir, rpms_dir):
795                print >>scriptfile, "/bin/rm -rf %s/*" % d
796                print >>scriptfile, "mkdir -p %s" % d
797            print >>scriptfile, 'mkdir -p %s' % proj_dir
798            self.create_config_tree("%s/%s" % (tmpdir, tb),
799                    proj_dir, scriptfile)
800            if os.path.isdir("%s/tarfiles" % tmpdir):
801                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
802                        scriptfile)
803            if os.path.isdir("%s/rpms" % tmpdir):
804                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
805                        scriptfile)
806            print >>scriptfile, "rm -f %s" % scriptbase
807            scriptfile.close()
808
809            # Move the script to the remote machine
810            # XXX: could collide tempfile names on the remote host
811            if self.scp_file(scriptname, user, host, scriptbase):
812                os.remove(scriptname)
813            else:
814                return False
815
816            # Execute the script (and the script's last line deletes it)
817            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
818                return False
819
820            for f in base_confs:
821                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
822                        "%s/%s" % (proj_dir, f)):
823                    return False
824            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
825                    proj_dir):
826                return False
827            if os.path.isdir("%s/tarfiles" % tmpdir):
828                if not self.ship_configs(host, user,
829                        "%s/tarfiles" % tmpdir, tarfiles_dir):
830                    return False
831            if os.path.isdir("%s/rpms" % tmpdir):
832                if not self.ship_configs(host, user,
833                        "%s/rpms" % tmpdir, tarfiles_dir):
834                    return False
835            # Stage the new configuration (active experiments will stay swapped
836            # in now)
837            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
838            try:
839                if not self.ssh_cmd(user, host,
840                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
841                                (pid, eid, tclfile),
842                        "modexp", timeout= 60 * 10):
843                    return False
844            except self.ssh_cmd_timeout:
845                self.log.error("Modify command failed to complete in time")
846                # There's really no way to see if this succeeded or failed, so
847                # if it hangs, assume the worst.
848                return False
849            # Active experiments are still swapped, this swaps the others in.
850            if state != 'active':
851                self.log.info("[start_segment]: Swapping %s in on %s" % \
852                        (eid, tb))
853                timedout = False
854                try:
855                    if not self.ssh_cmd(user, host,
856                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
857                            "swapexp", timeout=10*60):
858                        return False
859                except self.ssh_cmd_timeout:
860                    timedout = True
861               
862                # If the command was terminated, but completed successfully,
863                # report success.
864                if timedout:
865                    self.log.debug("[start_segment]: swapin timed out " +\
866                            "checking state")
867                    state = self.get_state(user, host, tb, pid, eid)
868                    self.log.debug("[start_segment]: state is %s" % state)
869                    return state == 'active'
870            # Everything has gone OK.
871            return True
872
873    class stop_segment(emulab_segment):
874        def __init__(self, log=None, keyfile=None, debug=False):
875            experiment_control_local.emulab_segment.__init__(self,
876                    log=log, keyfile=keyfile, debug=debug)
877
878        def __call__(self, tb, eid, tbparams):
879            """
880            Stop a sub experiment by calling swapexp on the federant
881            """
882            user = tbparams[tb]['user']
883            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
884            pid = tbparams[tb]['project']
885
886            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
887            rv = False
888            try:
889                # Clean out tar files: we've gone over quota in the past
890                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
891                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
892                        (pid, eid))
893                rv = self.ssh_cmd(user, host,
894                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
895            except self.ssh_cmd_timeout:
896                rv = False
897            return rv
898
899       
900    def generate_ssh_keys(self, dest, type="rsa" ):
901        """
902        Generate a set of keys for the gateways to use to talk.
903
904        Keys are of type type and are stored in the required dest file.
905        """
906        valid_types = ("rsa", "dsa")
907        t = type.lower();
908        if t not in valid_types: raise ValueError
909        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
910
911        try:
912            trace = open("/dev/null", "w")
913        except IOError:
914            raise service_error(service_error.internal,
915                    "Cannot open /dev/null??");
916
917        # May raise CalledProcessError
918        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
919        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
920        if rv != 0:
921            raise service_error(service_error.internal, 
922                    "Cannot generate nonce ssh keys.  %s return code %d" \
923                            % (self.ssh_keygen, rv))
924
925    def gentopo(self, str):
926        """
927        Generate the topology dtat structure from the splitter's XML
928        representation of it.
929
930        The topology XML looks like:
931            <experiment>
932                <nodes>
933                    <node><vname></vname><ips>ip1:ip2</ips></node>
934                </nodes>
935                <lans>
936                    <lan>
937                        <vname></vname><vnode></vnode><ip></ip>
938                        <bandwidth></bandwidth><member>node:port</member>
939                    </lan>
940                </lans>
941        """
942        class topo_parse:
943            """
944            Parse the topology XML and create the dats structure.
945            """
946            def __init__(self):
947                # Typing of the subelements for data conversion
948                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
949                self.int_subelements = ( 'bandwidth',)
950                self.float_subelements = ( 'delay',)
951                # The final data structure
952                self.nodes = [ ]
953                self.lans =  [ ]
954                self.topo = { \
955                        'node': self.nodes,\
956                        'lan' : self.lans,\
957                    }
958                self.element = { }  # Current element being created
959                self.chars = ""     # Last text seen
960
961            def end_element(self, name):
962                # After each sub element the contents is added to the current
963                # element or to the appropriate list.
964                if name == 'node':
965                    self.nodes.append(self.element)
966                    self.element = { }
967                elif name == 'lan':
968                    self.lans.append(self.element)
969                    self.element = { }
970                elif name in self.str_subelements:
971                    self.element[name] = self.chars
972                    self.chars = ""
973                elif name in self.int_subelements:
974                    self.element[name] = int(self.chars)
975                    self.chars = ""
976                elif name in self.float_subelements:
977                    self.element[name] = float(self.chars)
978                    self.chars = ""
979
980            def found_chars(self, data):
981                self.chars += data.rstrip()
982
983
984        tp = topo_parse();
985        parser = xml.parsers.expat.ParserCreate()
986        parser.EndElementHandler = tp.end_element
987        parser.CharacterDataHandler = tp.found_chars
988
989        parser.Parse(str)
990
991        return tp.topo
992       
993
994    def genviz(self, topo):
995        """
996        Generate the visualization the virtual topology
997        """
998
999        neato = "/usr/local/bin/neato"
1000        # These are used to parse neato output and to create the visualization
1001        # file.
1002        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
1003        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
1004                "%s</type></node>"
1005
1006        try:
1007            # Node names
1008            nodes = [ n['vname'] for n in topo['node'] ]
1009            topo_lans = topo['lan']
1010        except KeyError, e:
1011            raise service_error(service_error.internal, "Bad topology: %s" %e)
1012
1013        lans = { }
1014        links = { }
1015
1016        # Walk through the virtual topology, organizing the connections into
1017        # 2-node connections (links) and more-than-2-node connections (lans).
1018        # When a lan is created, it's added to the list of nodes (there's a
1019        # node in the visualization for the lan).
1020        for l in topo_lans:
1021            if links.has_key(l['vname']):
1022                if len(links[l['vname']]) < 2:
1023                    links[l['vname']].append(l['vnode'])
1024                else:
1025                    nodes.append(l['vname'])
1026                    lans[l['vname']] = links[l['vname']]
1027                    del links[l['vname']]
1028                    lans[l['vname']].append(l['vnode'])
1029            elif lans.has_key(l['vname']):
1030                lans[l['vname']].append(l['vnode'])
1031            else:
1032                links[l['vname']] = [ l['vnode'] ]
1033
1034
1035        # Open up a temporary file for dot to turn into a visualization
1036        try:
1037            df, dotname = tempfile.mkstemp()
1038            dotfile = os.fdopen(df, 'w')
1039        except IOError:
1040            raise service_error(service_error.internal,
1041                    "Failed to open file in genviz")
1042
1043        try:
1044            dnull = open('/dev/null', 'w')
1045        except IOError:
1046            service_error(service_error.internal,
1047                    "Failed to open /dev/null in genviz")
1048
1049        # Generate a dot/neato input file from the links, nodes and lans
1050        try:
1051            print >>dotfile, "graph G {"
1052            for n in nodes:
1053                print >>dotfile, '\t"%s"' % n
1054            for l in links.keys():
1055                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
1056            for l in lans.keys():
1057                for n in lans[l]:
1058                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
1059            print >>dotfile, "}"
1060            dotfile.close()
1061        except TypeError:
1062            raise service_error(service_error.internal,
1063                    "Single endpoint link in vtopo")
1064        except IOError:
1065            raise service_error(service_error.internal, "Cannot write dot file")
1066
1067        # Use dot to create a visualization
1068        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1069                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
1070                close_fds=True)
1071        dnull.close()
1072
1073        # Translate dot to vis format
1074        vis_nodes = [ ]
1075        vis = { 'node': vis_nodes }
1076        for line in dot.stdout:
1077            m = vis_re.match(line)
1078            if m:
1079                vn = m.group(1)
1080                vis_node = {'name': vn, \
1081                        'x': float(m.group(2)),\
1082                        'y' : float(m.group(3)),\
1083                    }
1084                if vn in links.keys() or vn in lans.keys():
1085                    vis_node['type'] = 'lan'
1086                else:
1087                    vis_node['type'] = 'node'
1088                vis_nodes.append(vis_node)
1089        rv = dot.wait()
1090
1091        os.remove(dotname)
1092        if rv == 0 : return vis
1093        else: return None
1094
1095    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1096            access_user):
1097        """
1098        Get access to testbed through fedd and set the parameters for that tb
1099        """
1100        uri = self.tbmap.get(tb, None)
1101        if not uri:
1102            raise service_error(serice_error.server_config, 
1103                    "Unknown testbed: %s" % tb)
1104
1105        # currently this lumps all users into one service access group
1106        service_keys = [ a for u in user \
1107                for a in u.get('access', []) \
1108                    if a.has_key('sshPubkey')]
1109
1110        if len(service_keys) == 0:
1111            raise service_error(service_error.req, 
1112                    "Must have at least one SSH pubkey for services")
1113
1114
1115        for p, u in access_user:
1116            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1117                    "to %s") %  ((p or "None"), u, uri))
1118
1119            if p:
1120                # Request with user and project specified
1121                req = {\
1122                        'destinationTestbed' : { 'uri' : uri },
1123                        'project': { 
1124                            'name': {'localname': p},
1125                            'user': [ {'userID': { 'localname': u } } ],
1126                            },
1127                        'user':  user,
1128                        'allocID' : { 'localname': 'test' },
1129                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1130                        'serviceAccess' : service_keys
1131                    }
1132            else:
1133                # Request with only user specified
1134                req = {\
1135                        'destinationTestbed' : { 'uri' : uri },
1136                        'user':  [ {'userID': { 'localname': u } } ],
1137                        'allocID' : { 'localname': 'test' },
1138                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1139                        'serviceAccess' : service_keys
1140                    }
1141
1142            if tb == master:
1143                # NB, the export_project parameter is a dict that includes
1144                # the type
1145                req['exportProject'] = export_project
1146
1147            # node resources if any
1148            if nodes != None and len(nodes) > 0:
1149                rnodes = [ ]
1150                for n in nodes:
1151                    rn = { }
1152                    image, hw, count = n.split(":")
1153                    if image: rn['image'] = [ image ]
1154                    if hw: rn['hardware'] = [ hw ]
1155                    if count and int(count) >0 : rn['count'] = int(count)
1156                    rnodes.append(rn)
1157                req['resources']= { }
1158                req['resources']['node'] = rnodes
1159
1160            try:
1161                if self.local_access.has_key(uri):
1162                    # Local access call
1163                    req = { 'RequestAccessRequestBody' : req }
1164                    r = self.local_access[uri].RequestAccess(req, 
1165                            fedid(file=self.cert_file))
1166                    r = { 'RequestAccessResponseBody' : r }
1167                else:
1168                    r = self.call_RequestAccess(uri, req, 
1169                            self.cert_file, self.cert_pwd, self.trusted_certs)
1170            except service_error, e:
1171                if e.code == service_error.access:
1172                    self.log.debug("[get_access] Access denied")
1173                    r = None
1174                    continue
1175                else:
1176                    raise e
1177
1178            if r.has_key('RequestAccessResponseBody'):
1179                # Through to here we have a valid response, not a fault.
1180                # Access denied is a fault, so something better or worse than
1181                # access denied has happened.
1182                r = r['RequestAccessResponseBody']
1183                self.log.debug("[get_access] Access granted")
1184                break
1185            else:
1186                raise service_error(service_error.protocol,
1187                        "Bad proxy response")
1188       
1189        if not r:
1190            raise service_error(service_error.access, 
1191                    "Access denied by %s (%s)" % (tb, uri))
1192
1193        e = r['emulab']
1194        p = e['project']
1195        tbparam[tb] = { 
1196                "boss": e['boss'],
1197                "host": e['ops'],
1198                "domain": e['domain'],
1199                "fs": e['fileServer'],
1200                "eventserver": e['eventServer'],
1201                "project": unpack_id(p['name']),
1202                "emulab" : e,
1203                "allocID" : r['allocID'],
1204                }
1205        # Make the testbed name be the label the user applied
1206        p['testbed'] = {'localname': tb }
1207
1208        for u in p['user']:
1209            role = u.get('role', None)
1210            if role == 'experimentCreation':
1211                tbparam[tb]['user'] = unpack_id(u['userID'])
1212                break
1213        else:
1214            raise service_error(service_error.internal, 
1215                    "No createExperimentUser from %s" %tb)
1216
1217        # Add attributes to barameter space.  We don't allow attributes to
1218        # overlay any parameters already installed.
1219        for a in e['fedAttr']:
1220            try:
1221                if a['attribute'] and isinstance(a['attribute'], basestring)\
1222                        and not tbparam[tb].has_key(a['attribute'].lower()):
1223                    tbparam[tb][a['attribute'].lower()] = a['value']
1224            except KeyError:
1225                self.log.error("Bad attribute in response: %s" % a)
1226       
1227    def release_access(self, tb, aid):
1228        """
1229        Release access to testbed through fedd
1230        """
1231
1232        uri = self.tbmap.get(tb, None)
1233        if not uri:
1234            raise service_error(serice_error.server_config, 
1235                    "Unknown testbed: %s" % tb)
1236
1237        if self.local_access.has_key(uri):
1238            resp = self.local_access[uri].ReleaseAccess(\
1239                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1240                    fedid(file=self.cert_file))
1241            resp = { 'ReleaseAccessResponseBody': resp } 
1242        else:
1243            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1244                    self.cert_file, self.cert_pwd, self.trusted_certs)
1245
1246        # better error coding
1247
1248    def remote_splitter(self, uri, desc, master):
1249
1250        req = {
1251                'description' : { 'ns2description': desc },
1252                'master': master,
1253                'include_fedkit': bool(self.fedkit),
1254                'include_gatewaykit': bool(self.gatewaykit)
1255            }
1256
1257        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1258                self.trusted_certs)
1259
1260        if r.has_key('Ns2SplitResponseBody'):
1261            r = r['Ns2SplitResponseBody']
1262            if r.has_key('output'):
1263                return r['output'].splitlines()
1264            else:
1265                raise service_error(service_error.protocol, 
1266                        "Bad splitter response (no output)")
1267        else:
1268            raise service_error(service_error.protocol, "Bad splitter response")
1269       
1270    class current_testbed:
1271        """
1272        Object for collecting the current testbed description.  The testbed
1273        description is saved to a file with the local testbed variables
1274        subsittuted line by line.
1275        """
1276        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1277            def tar_list_to_string(tl):
1278                if tl is None: return None
1279
1280                rv = ""
1281                for t in tl:
1282                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1283                            (t[0], os.path.basename(t[1]))
1284                return rv
1285
1286
1287            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1288            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1289            self.current_testbed = None
1290            self.testbed_file = None
1291
1292            self.def_expstart = \
1293                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1294            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1295            self.def_gwstart = \
1296                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1297            self.def_mgwstart = \
1298                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1299            self.def_gwimage = "FBSD61-TUNNEL2";
1300            self.def_gwtype = "pc";
1301            self.def_mgwcmd = '# '
1302            self.def_mgwcmdparams = ''
1303            self.def_gwcmd = '# '
1304            self.def_gwcmdparams = ''
1305
1306            self.eid = eid
1307            self.tmpdir = tmpdir
1308            # Convert fedkit and gateway kit (which are lists of tuples) into a
1309            # substituition string.
1310            self.fedkit = tar_list_to_string(fedkit)
1311            self.gatewaykit = tar_list_to_string(gatewaykit)
1312
1313        def __call__(self, line, master, allocated, tbparams):
1314            # Capture testbed topology descriptions
1315            if self.current_testbed == None:
1316                m = self.begin_testbed.match(line)
1317                if m != None:
1318                    self.current_testbed = m.group(1)
1319                    if self.current_testbed == None:
1320                        raise service_error(service_error.req,
1321                                "Bad request format (unnamed testbed)")
1322                    allocated[self.current_testbed] = \
1323                            allocated.get(self.current_testbed,0) + 1
1324                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1325                    if not os.path.exists(tb_dir):
1326                        try:
1327                            os.mkdir(tb_dir)
1328                        except IOError:
1329                            raise service_error(service_error.internal,
1330                                    "Cannot create %s" % tb_dir)
1331                    try:
1332                        self.testbed_file = open("%s/%s.%s.tcl" %
1333                                (tb_dir, self.eid, self.current_testbed), 'w')
1334                    except IOError:
1335                        self.testbed_file = None
1336                    return True
1337                else: return False
1338            else:
1339                m = self.end_testbed.match(line)
1340                if m != None:
1341                    if m.group(1) != self.current_testbed:
1342                        raise service_error(service_error.internal, 
1343                                "Mismatched testbed markers!?")
1344                    if self.testbed_file != None: 
1345                        self.testbed_file.close()
1346                        self.testbed_file = None
1347                    self.current_testbed = None
1348                elif self.testbed_file:
1349                    # Substitute variables and put the line into the local
1350                    # testbed file.
1351                    gwtype = tbparams[self.current_testbed].get(\
1352                            'connectortype', self.def_gwtype)
1353                    gwimage = tbparams[self.current_testbed].get(\
1354                            'connectorimage', self.def_gwimage)
1355                    mgwstart = tbparams[self.current_testbed].get(\
1356                            'masterconnectorstartcmd', self.def_mgwstart)
1357                    mexpstart = tbparams[self.current_testbed].get(\
1358                            'masternodestartcmd', self.def_mexpstart)
1359                    gwstart = tbparams[self.current_testbed].get(\
1360                            'slaveconnectorstartcmd', self.def_gwstart)
1361                    expstart = tbparams[self.current_testbed].get(\
1362                            'slavenodestartcmd', self.def_expstart)
1363                    project = tbparams[self.current_testbed].get('project')
1364                    gwcmd = tbparams[self.current_testbed].get(\
1365                            'slaveconnectorcmd', self.def_gwcmd)
1366                    gwcmdparams = tbparams[self.current_testbed].get(\
1367                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1368                    mgwcmd = tbparams[self.current_testbed].get(\
1369                            'masterconnectorcmd', self.def_gwcmd)
1370                    mgwcmdparams = tbparams[self.current_testbed].get(\
1371                            'masterconnectorcmdparams', self.def_gwcmdparams)
1372                    line = re.sub("GWTYPE", gwtype, line)
1373                    line = re.sub("GWIMAGE", gwimage, line)
1374                    if self.current_testbed == master:
1375                        line = re.sub("GWSTART", mgwstart, line)
1376                        line = re.sub("EXPSTART", mexpstart, line)
1377                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1378                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1379                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1380                    else:
1381                        line = re.sub("GWSTART", gwstart, line)
1382                        line = re.sub("EXPSTART", expstart, line)
1383                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1384                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1385                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1386                    #These expansions contain EID and PROJDIR.  NB these are
1387                    # local fedkit and gatewaykit, which are strings.
1388                    if self.fedkit:
1389                        line = re.sub("FEDKIT", self.fedkit, line)
1390                    if self.gatewaykit:
1391                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1392                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1393                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1394                    line = re.sub("EID", self.eid, line)
1395                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1396                            (project, self.eid), line)
1397                    print >>self.testbed_file, line
1398                return True
1399
1400    class allbeds:
1401        """
1402        Process the Allbeds section.  Get access to each federant and save the
1403        parameters in tbparams
1404        """
1405        def __init__(self, get_access):
1406            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1407            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1408            self.in_allbeds = False
1409            self.get_access = get_access
1410
1411        def __call__(self, line, user, tbparams, master, export_project,
1412                access_user):
1413            # Testbed access parameters
1414            if not self.in_allbeds:
1415                if self.begin_allbeds.match(line):
1416                    self.in_allbeds = True
1417                    return True
1418                else:
1419                    return False
1420            else:
1421                if self.end_allbeds.match(line):
1422                    self.in_allbeds = False
1423                else:
1424                    nodes = line.split('|')
1425                    tb = nodes.pop(0)
1426                    self.get_access(tb, nodes, user, tbparams, master,
1427                            export_project, access_user)
1428                return True
1429
1430    class gateways:
1431        def __init__(self, eid, master, tmpdir, gw_pubkey,
1432                gw_secretkey, copy_file, fedkit):
1433            self.begin_gateways = \
1434                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1435            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1436            self.current_gateways = None
1437            self.control_gateway = None
1438            self.active_end = { }
1439
1440            self.eid = eid
1441            self.master = master
1442            self.tmpdir = tmpdir
1443            self.gw_pubkey_base = gw_pubkey
1444            self.gw_secretkey_base = gw_secretkey
1445
1446            self.copy_file = copy_file
1447            self.fedkit = fedkit
1448
1449
1450        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1451                active_end, tbparams, dtb, myname, desthost, type):
1452            """
1453            Produce a gateway configuration file from a gateways line.
1454            """
1455
1456            sproject = tbparams[gw].get('project', 'project')
1457            dproject = tbparams[dtb].get('project', 'project')
1458            sdomain = ".%s.%s%s" % (eid, sproject,
1459                    tbparams[gw].get('domain', ".example.com"))
1460            ddomain = ".%s.%s%s" % (eid, dproject,
1461                    tbparams[dtb].get('domain', ".example.com"))
1462            boss = tbparams[master].get('boss', "boss")
1463            fs = tbparams[master].get('fs', "fs")
1464            event_server = "%s%s" % \
1465                    (tbparams[gw].get('eventserver', "event_server"),
1466                            tbparams[gw].get('domain', "example.com"))
1467            remote_event_server = "%s%s" % \
1468                    (tbparams[dtb].get('eventserver', "event_server"),
1469                            tbparams[dtb].get('domain', "example.com"))
1470            seer_control = "%s%s" % \
1471                    (tbparams[gw].get('control', "control"), sdomain)
1472            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1473
1474            if self.fedkit:
1475                remote_script_dir = "/usr/local/federation/bin"
1476                local_script_dir = "/usr/local/federation/bin"
1477            else:
1478                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1479                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1480
1481            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1482            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1483            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1484
1485            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1486            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1487
1488            # translate to lower case so the `hostname` hack for specifying
1489            # configuration files works.
1490            conf_file = conf_file.lower();
1491            remote_conf_file = remote_conf_file.lower();
1492
1493            if dtb == master:
1494                active = "false"
1495            elif gw == master:
1496                active = "true"
1497            elif active_end.has_key('%s-%s' % (dtb, gw)):
1498                active = "false"
1499            else:
1500                active_end['%s-%s' % (gw, dtb)] = 1
1501                active = "true"
1502
1503            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1504            print >>gwconfig, "Active: %s" % active
1505            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1506            if tunnel_iface:
1507                print >>gwconfig, "Interface: %s" % tunnel_iface
1508            print >>gwconfig, "BossName: %s" % boss
1509            print >>gwconfig, "FsName: %s" % fs
1510            print >>gwconfig, "EventServerName: %s" % event_server
1511            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1512            print >>gwconfig, "SeerControl: %s" % seer_control
1513            print >>gwconfig, "Type: %s" % type
1514            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1515            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1516                    local_script_dir
1517            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1518            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1519            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1520                    (remote_conf_dir, remote_conf_file)
1521            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1522            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1523            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1524            gwconfig.close()
1525
1526            return active == "true"
1527
1528        def __call__(self, line, allocated, tbparams):
1529            # Process gateways
1530            if not self.current_gateways:
1531                m = self.begin_gateways.match(line)
1532                if m:
1533                    self.current_gateways = m.group(1)
1534                    if allocated.has_key(self.current_gateways):
1535                        # This test should always succeed
1536                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1537                        if not os.path.exists(tb_dir):
1538                            try:
1539                                os.mkdir(tb_dir)
1540                            except IOError:
1541                                raise service_error(service_error.internal,
1542                                        "Cannot create %s" % tb_dir)
1543                    else:
1544                        # XXX
1545                        self.log.error("[gateways]: Ignoring gateways for " + \
1546                                "unknown testbed %s" % self.current_gateways)
1547                        self.current_gateways = None
1548                    return True
1549                else:
1550                    return False
1551            else:
1552                m = self.end_gateways.match(line)
1553                if m :
1554                    if m.group(1) != self.current_gateways:
1555                        raise service_error(service_error.internal,
1556                                "Mismatched gateway markers!?")
1557                    if self.control_gateway:
1558                        try:
1559                            cc = open("%s/%s/client.conf" %
1560                                    (self.tmpdir, self.current_gateways), 'w')
1561                            print >>cc, "ControlGateway: %s" % \
1562                                    self.control_gateway
1563                            if tbparams[self.master].has_key('smbshare'):
1564                                print >>cc, "SMBSHare: %s" % \
1565                                        tbparams[self.master]['smbshare']
1566                            print >>cc, "ProjectUser: %s" % \
1567                                    tbparams[self.master]['user']
1568                            print >>cc, "ProjectName: %s" % \
1569                                    tbparams[self.master]['project']
1570                            print >>cc, "ExperimentID: %s/%s" % \
1571                                    ( tbparams[self.master]['project'], \
1572                                    self.eid )
1573                            cc.close()
1574                        except IOError:
1575                            raise service_error(service_error.internal,
1576                                    "Error creating client config")
1577                        # XXX: This seer specific file should disappear
1578                        try:
1579                            cc = open("%s/%s/seer.conf" %
1580                                    (self.tmpdir, self.current_gateways),
1581                                    'w')
1582                            if self.current_gateways != self.master:
1583                                print >>cc, "ControlNode: %s" % \
1584                                        self.control_gateway
1585                            print >>cc, "ExperimentID: %s/%s" % \
1586                                    ( tbparams[self.master]['project'], \
1587                                    self.eid )
1588                            cc.close()
1589                        except IOError:
1590                            raise service_error(service_error.internal,
1591                                    "Error creating seer config")
1592                    else:
1593                        debug.error("[gateways]: No control gateway for %s" %\
1594                                    self.current_gateways)
1595                    self.current_gateways = None
1596                else:
1597                    dtb, myname, desthost, type = line.split(" ")
1598
1599                    if type == "control" or type == "both":
1600                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1601                                self.eid, 
1602                                tbparams[self.current_gateways]['project'],
1603                                tbparams[self.current_gateways]['domain'])
1604                    try:
1605                        active = self.gateway_conf_file(self.current_gateways,
1606                                self.master, self.eid, self.gw_pubkey_base,
1607                                self.gw_secretkey_base,
1608                                self.active_end, tbparams, dtb, myname,
1609                                desthost, type)
1610                    except IOError, e:
1611                        raise service_error(service_error.internal,
1612                                "Failed to write config file for %s" % \
1613                                        self.current_gateway)
1614           
1615                    gw_pubkey = "%s/keys/%s" % \
1616                            (self.tmpdir, self.gw_pubkey_base)
1617                    gw_secretkey = "%s/keys/%s" % \
1618                            (self.tmpdir, self.gw_secretkey_base)
1619
1620                    pkfile = "%s/%s/%s" % \
1621                            ( self.tmpdir, self.current_gateways, 
1622                                    self.gw_pubkey_base)
1623                    skfile = "%s/%s/%s" % \
1624                            ( self.tmpdir, self.current_gateways, 
1625                                    self.gw_secretkey_base)
1626
1627                    if not os.path.exists(pkfile):
1628                        try:
1629                            self.copy_file(gw_pubkey, pkfile)
1630                        except IOError:
1631                            service_error(service_error.internal,
1632                                    "Failed to copy pubkey file")
1633
1634                    if active and not os.path.exists(skfile):
1635                        try:
1636                            self.copy_file(gw_secretkey, skfile)
1637                        except IOError:
1638                            service_error(service_error.internal,
1639                                    "Failed to copy secretkey file")
1640                return True
1641
1642    class shunt_to_file:
1643        """
1644        Simple class to write data between two regexps to a file.
1645        """
1646        def __init__(self, begin, end, filename):
1647            """
1648            Begin shunting on a match of begin, stop on end, send data to
1649            filename.
1650            """
1651            self.begin = re.compile(begin)
1652            self.end = re.compile(end)
1653            self.in_shunt = False
1654            self.file = None
1655            self.filename = filename
1656
1657        def __call__(self, line):
1658            """
1659            Call this on each line in the input that may be shunted.
1660            """
1661            if not self.in_shunt:
1662                if self.begin.match(line):
1663                    self.in_shunt = True
1664                    try:
1665                        self.file = open(self.filename, "w")
1666                    except:
1667                        self.file = None
1668                        raise
1669                    return True
1670                else:
1671                    return False
1672            else:
1673                if self.end.match(line):
1674                    if self.file: 
1675                        self.file.close()
1676                        self.file = None
1677                    self.in_shunt = False
1678                else:
1679                    if self.file:
1680                        print >>self.file, line
1681                return True
1682
1683    class shunt_to_list:
1684        """
1685        Same interface as shunt_to_file.  Data collected in self.list, one list
1686        element per line.
1687        """
1688        def __init__(self, begin, end):
1689            self.begin = re.compile(begin)
1690            self.end = re.compile(end)
1691            self.in_shunt = False
1692            self.list = [ ]
1693       
1694        def __call__(self, line):
1695            if not self.in_shunt:
1696                if self.begin.match(line):
1697                    self.in_shunt = True
1698                    return True
1699                else:
1700                    return False
1701            else:
1702                if self.end.match(line):
1703                    self.in_shunt = False
1704                else:
1705                    self.list.append(line)
1706                return True
1707
1708    class shunt_to_string:
1709        """
1710        Same interface as shunt_to_file.  Data collected in self.str, all in
1711        one string.
1712        """
1713        def __init__(self, begin, end):
1714            self.begin = re.compile(begin)
1715            self.end = re.compile(end)
1716            self.in_shunt = False
1717            self.str = ""
1718       
1719        def __call__(self, line):
1720            if not self.in_shunt:
1721                if self.begin.match(line):
1722                    self.in_shunt = True
1723                    return True
1724                else:
1725                    return False
1726            else:
1727                if self.end.match(line):
1728                    self.in_shunt = False
1729                else:
1730                    self.str += line
1731                return True
1732
1733    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1734            tbparams, tmpdir, alloc_log=None):
1735        started = { }           # Testbeds where a sub-experiment started
1736                                # successfully
1737
1738        # XXX
1739        fail_soft = False
1740
1741        log = alloc_log or self.log
1742
1743        thread_pool = self.thread_pool(self.nthreads)
1744        threads = [ ]
1745
1746        for tb in [ k for k in allocated.keys() if k != master]:
1747            # Create and start a thread to start the segment, and save it to
1748            # get the return value later
1749            thread_pool.wait_for_slot()
1750            t  = self.pooled_thread(\
1751                    target=self.start_segment(log=log,
1752                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1753                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1754                    pdata=thread_pool, trace_file=self.trace_file)
1755            threads.append(t)
1756            t.start()
1757
1758        # Wait until all finish
1759        thread_pool.wait_for_all_done()
1760
1761        # If none failed, start the master
1762        failed = [ t.getName() for t in threads if not t.rv ]
1763
1764        if len(failed) == 0:
1765            starter = self.start_segment(log=log, 
1766                    keyfile=self.ssh_privkey_file, debug=self.debug)
1767            if not starter(master, eid, tbparams, tmpdir):
1768                failed.append(master)
1769
1770        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1771        # If one failed clean up, unless fail_soft is set
1772        if failed:
1773            if not fail_soft:
1774                thread_pool.clear()
1775                for tb in succeeded:
1776                    # Create and start a thread to stop the segment
1777                    thread_pool.wait_for_slot()
1778                    t  = self.pooled_thread(\
1779                            target=self.stop_segment(log=log,
1780                                keyfile=self.ssh_privkey_file,
1781                                debug=self.debug), 
1782                            args=(tb, eid, tbparams), name=tb,
1783                            pdata=thread_pool, trace_file=self.trace_file)
1784                    t.start()
1785                # Wait until all finish
1786                thread_pool.wait_for_all_done()
1787
1788                # release the allocations
1789                for tb in tbparams.keys():
1790                    self.release_access(tb, tbparams[tb]['allocID'])
1791                # Remove the placeholder
1792                self.state_lock.acquire()
1793                self.state[eid]['experimentStatus'] = 'failed'
1794                if self.state_filename: self.write_state()
1795                self.state_lock.release()
1796
1797                #raise service_error(service_error.federant,
1798                #    "Swap in failed on %s" % ",".join(failed))
1799                log.error("Swap in failed on %s" % ",".join(failed))
1800                return
1801        else:
1802            log.info("[start_segment]: Experiment %s active" % eid)
1803
1804        log.debug("[start_experiment]: removing %s" % tmpdir)
1805
1806        # Walk up tmpdir, deleting as we go
1807        for path, dirs, files in os.walk(tmpdir, topdown=False):
1808            for f in files:
1809                os.remove(os.path.join(path, f))
1810            for d in dirs:
1811                os.rmdir(os.path.join(path, d))
1812        os.rmdir(tmpdir)
1813
1814        # Insert the experiment into our state and update the disk copy
1815        self.state_lock.acquire()
1816        self.state[expid]['experimentStatus'] = 'active'
1817        self.state[eid] = self.state[expid]
1818        if self.state_filename: self.write_state()
1819        self.state_lock.release()
1820        return
1821
1822    def create_experiment(self, req, fid):
1823        """
1824        The external interface to experiment creation called from the
1825        dispatcher.
1826
1827        Creates a working directory, splits the incoming description using the
1828        splitter script and parses out the avrious subsections using the
1829        lcasses above.  Once each sub-experiment is created, use pooled threads
1830        to instantiate them and start it all up.
1831        """
1832
1833        if not self.auth.check_attribute(fid, 'create'):
1834            raise service_error(service_error.access, "Create access denied")
1835
1836        try:
1837            tmpdir = tempfile.mkdtemp(prefix="split-")
1838        except IOError:
1839            raise service_error(service_error.internal, "Cannot create tmp dir")
1840
1841        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1842        gw_secretkey_base = "fed.%s" % self.ssh_type
1843        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1844        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1845        tclfile = tmpdir + "/experiment.tcl"
1846        tbparams = { }
1847        try:
1848            access_user = self.accessdb[fid]
1849        except KeyError:
1850            raise service_error(service_error.internal,
1851                    "Access map and authorizer out of sync in " + \
1852                            "create_experiment for fedid %s"  % fid)
1853
1854        pid = "dummy"
1855        gid = "dummy"
1856        try:
1857            os.mkdir(tmpdir+"/keys")
1858        except OSError:
1859            raise service_error(service_error.internal,
1860                    "Can't make temporary dir")
1861
1862        req = req.get('CreateRequestBody', None)
1863        if not req:
1864            raise service_error(service_error.req,
1865                    "Bad request format (no CreateRequestBody)")
1866        # The tcl parser needs to read a file so put the content into that file
1867        descr=req.get('experimentdescription', None)
1868        if descr:
1869            file_content=descr.get('ns2description', None)
1870            if file_content:
1871                try:
1872                    f = open(tclfile, 'w')
1873                    f.write(file_content)
1874                    f.close()
1875                except IOError:
1876                    raise service_error(service_error.internal,
1877                            "Cannot write temp experiment description")
1878            else:
1879                raise service_error(service_error.req, 
1880                        "Only ns2descriptions supported")
1881        else:
1882            raise service_error(service_error.req, "No experiment description")
1883
1884        # Generate an ID for the experiment (slice) and a certificate that the
1885        # allocator can use to prove they own it.  We'll ship it back through
1886        # the encrypted connection.
1887        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1888
1889        if req.has_key('experimentID') and \
1890                req['experimentID'].has_key('localname'):
1891            overwrite = False
1892            eid = req['experimentID']['localname']
1893            # If there's an old failed experiment here with the same local name
1894            # and accessible by this user, we'll overwrite it, otherwise we'll
1895            # fall through and do the collision avoidance.
1896            old_expid = self.get_experiment_fedid(eid)
1897            if old_expid and self.check_experiment_access(fid, old_expid):
1898                self.state_lock.acquire()
1899                status = self.state[eid].get('experimentStatus', None)
1900                if status and status == 'failed':
1901                    # remove the old access attribute
1902                    self.auth.unset_attribute(fid, old_expid)
1903                    overwrite = True
1904                    del self.state[eid]
1905                    del self.state[old_expid]
1906                self.state_lock.release()
1907            self.state_lock.acquire()
1908            while (self.state.has_key(eid) and not overwrite):
1909                eid += random.choice(string.ascii_letters)
1910            # Initial state
1911            self.state[eid] = {
1912                    'experimentID' : \
1913                            [ { 'localname' : eid }, {'fedid': expid } ],
1914                    'experimentStatus': 'starting',
1915                    'experimentAccess': { 'X509' : expcert },
1916                    'owner': fid,
1917                    'log' : [],
1918                }
1919            self.state[expid] = self.state[eid]
1920            if self.state_filename: self.write_state()
1921            self.state_lock.release()
1922        else:
1923            eid = self.exp_stem
1924            for i in range(0,5):
1925                eid += random.choice(string.ascii_letters)
1926            self.state_lock.acquire()
1927            while (self.state.has_key(eid)):
1928                eid = self.exp_stem
1929                for i in range(0,5):
1930                    eid += random.choice(string.ascii_letters)
1931            # Initial state
1932            self.state[eid] = {
1933                    'experimentID' : \
1934                            [ { 'localname' : eid }, {'fedid': expid } ],
1935                    'experimentStatus': 'starting',
1936                    'experimentAccess': { 'X509' : expcert },
1937                    'owner': fid,
1938                    'log' : [],
1939                }
1940            self.state[expid] = self.state[eid]
1941            if self.state_filename: self.write_state()
1942            self.state_lock.release()
1943
1944        try: 
1945            # This catches exceptions to clear the placeholder if necessary
1946            try:
1947                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1948            except ValueError:
1949                raise service_error(service_error.server_config, 
1950                        "Bad key type (%s)" % self.ssh_type)
1951
1952            user = req.get('user', None)
1953            if user == None:
1954                raise service_error(service_error.req, "No user")
1955
1956            master = req.get('master', None)
1957            if not master:
1958                raise service_error(service_error.req,
1959                        "No master testbed label")
1960            export_project = req.get('exportProject', None)
1961            if not export_project:
1962                raise service_error(service_error.req, "No export project")
1963           
1964            if self.splitter_url:
1965                self.log.debug("Calling remote splitter at %s" % \
1966                        self.splitter_url)
1967                split_data = self.remote_splitter(self.splitter_url,
1968                        file_content, master)
1969            else:
1970                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1971                    str(self.muxmax), '-m', master]
1972
1973                if self.fedkit:
1974                    tclcmd.append('-k')
1975
1976                if self.gatewaykit:
1977                    tclcmd.append('-K')
1978
1979                tclcmd.extend([pid, gid, eid, tclfile])
1980
1981                self.log.debug("running local splitter %s", " ".join(tclcmd))
1982                # This is just fantastic.  As a side effect the parser copies
1983                # tb_compat.tcl into the current directory, so that directory
1984                # must be writable by the fedd user.  Doing this in the
1985                # temporary subdir ensures this is the case.
1986                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1987                        cwd=tmpdir)
1988                split_data = tclparser.stdout
1989
1990            allocated = { }         # Testbeds we can access
1991            # Objects to parse the splitter output (defined above)
1992            parse_current_testbed = self.current_testbed(eid, tmpdir,
1993                    self.fedkit, self.gatewaykit)
1994            parse_allbeds = self.allbeds(self.get_access)
1995            parse_gateways = self.gateways(eid, master, tmpdir,
1996                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1997                    self.fedkit)
1998            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1999                        "^#\s+End\s+Vtopo")
2000            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
2001                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
2002            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
2003                    "^#\s+End\s+tarfiles")
2004            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
2005                    "^#\s+End\s+rpms")
2006
2007            # Working on the split data
2008            for line in split_data:
2009                line = line.rstrip()
2010                if parse_current_testbed(line, master, allocated, tbparams):
2011                    continue
2012                elif parse_allbeds(line, user, tbparams, master, export_project,
2013                        access_user):
2014                    continue
2015                elif parse_gateways(line, allocated, tbparams):
2016                    continue
2017                elif parse_vtopo(line):
2018                    continue
2019                elif parse_hostnames(line):
2020                    continue
2021                elif parse_tarfiles(line):
2022                    continue
2023                elif parse_rpms(line):
2024                    continue
2025                else:
2026                    raise service_error(service_error.internal, 
2027                            "Bad tcl parse? %s" % line)
2028            # Virtual topology and visualization
2029            vtopo = self.gentopo(parse_vtopo.str)
2030            if not vtopo:
2031                raise service_error(service_error.internal, 
2032                        "Failed to generate virtual topology")
2033
2034            vis = self.genviz(vtopo)
2035            if not vis:
2036                raise service_error(service_error.internal, 
2037                        "Failed to generate visualization")
2038
2039           
2040            # save federant information
2041            for k in allocated.keys():
2042                tbparams[k]['federant'] = {\
2043                        'name': [ { 'localname' : eid} ],\
2044                        'emulab': tbparams[k]['emulab'],\
2045                        'allocID' : tbparams[k]['allocID'],\
2046                        'master' : k == master,\
2047                    }
2048
2049            self.state_lock.acquire()
2050            self.state[eid]['vtopo'] = vtopo
2051            self.state[eid]['vis'] = vis
2052            self.state[expid]['federant'] = \
2053                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2054                        if tbparams[tb].has_key('federant') ]
2055            if self.state_filename: self.write_state()
2056            self.state_lock.release()
2057
2058            # Copy tarfiles and rpms needed at remote sites into a staging area
2059            try:
2060                if self.fedkit:
2061                    for t in self.fedkit:
2062                        parse_tarfiles.list.append(t[1])
2063                if self.gatewaykit:
2064                    for t in self.gatewaykit:
2065                        parse_tarfiles.list.append(t[1])
2066                for t in parse_tarfiles.list:
2067                    if not os.path.exists("%s/tarfiles" % tmpdir):
2068                        os.mkdir("%s/tarfiles" % tmpdir)
2069                    self.copy_file(t, "%s/tarfiles/%s" % \
2070                            (tmpdir, os.path.basename(t)))
2071                for r in parse_rpms.list:
2072                    if not os.path.exists("%s/rpms" % tmpdir):
2073                        os.mkdir("%s/rpms" % tmpdir)
2074                    self.copy_file(r, "%s/rpms/%s" % \
2075                            (tmpdir, os.path.basename(r)))
2076                # A null experiment file in case we need to create a remote
2077                # experiment from scratch
2078                f = open("%s/null.tcl" % tmpdir, "w")
2079                print >>f, """
2080set ns [new Simulator]
2081source tb_compat.tcl
2082
2083set a [$ns node]
2084
2085$ns rtproto Session
2086$ns run
2087"""
2088                f.close()
2089
2090            except IOError, e:
2091                raise service_error(service_error.internal, 
2092                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2093
2094        except service_error, e:
2095            # If something goes wrong in the parse (usually an access error)
2096            # clear the placeholder state.  From here on out the code delays
2097            # exceptions.  Failing at this point returns a fault to the remote
2098            # caller.
2099            self.state_lock.acquire()
2100            del self.state[eid]
2101            del self.state[expid]
2102            if self.state_filename: self.write_state()
2103            self.state_lock.release()
2104            raise e
2105
2106
2107        # Start the background swapper and return the starting state.  From
2108        # here on out, the state will stick around a while.
2109
2110        # Let users touch the state
2111        self.auth.set_attribute(fid, expid)
2112        self.auth.set_attribute(expid, expid)
2113        # Override fedids can manipulate state as well
2114        for o in self.overrides:
2115            self.auth.set_attribute(o, expid)
2116
2117        # Create a logger that logs to the experiment's state object as well as
2118        # to the main log file.
2119        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2120        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2121        # XXX: there should be a global one of these rather than repeating the
2122        # code.
2123        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2124                    '%d %b %y %H:%M:%S'))
2125        alloc_log.addHandler(h)
2126       
2127        # Start a thread to do the resource allocation
2128        t  = Thread(target=self.allocate_resources,
2129                args=(allocated, master, eid, expid, expcert, tbparams, 
2130                    tmpdir, alloc_log),
2131                name=eid)
2132        t.start()
2133
2134        rv = {
2135                'experimentID': [
2136                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2137                ],
2138                'experimentStatus': 'starting',
2139                'experimentAccess': { 'X509' : expcert }
2140            }
2141
2142        return rv
2143
2144    class new_start_segment:
2145        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
2146                cert_pwd=None, trusted_certs=None, caller=None):
2147            self.log = log
2148            self.debug = debug
2149            self.cert_file = cert_file
2150            self.cert_pwd = cert_pwd
2151            self.trusted_certs = None
2152            self.caller = caller
2153            self.testbed = testbed
2154
2155        def __call__(self, uri, aid, topo, master, attrs=None):
2156            req = {
2157                    'allocID': { 'fedid' : aid }, 
2158                    'segmentdescription': { 
2159                        'topdldescription': topo.to_dict(),
2160                    },
2161                    'master': master,
2162                }
2163            if attrs:
2164                req['fedAttr'] = attrs
2165
2166            try:
2167                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
2168                        self.trusted_certs)
2169                return True
2170            except service_error, e:
2171                self.log.error("Start segment failed on %s: %s" % \
2172                        (self.testbed, e))
2173                return False
2174
2175
2176
2177    class new_terminate_segment:
2178        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
2179                cert_pwd=None, trusted_certs=None, caller=None):
2180            self.log = log
2181            self.debug = debug
2182            self.cert_file = cert_file
2183            self.cert_pwd = cert_pwd
2184            self.trusted_certs = None
2185            self.caller = caller
2186            self.testbed = testbed
2187
2188        def __call__(self, uri, aid ):
2189            print "in terminate_segment: %s" % aid
2190            req = {
2191                    'allocID': aid , 
2192                }
2193            try:
2194                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
2195                        self.trusted_certs)
2196                return True
2197            except service_error, e:
2198                self.log.error("Terminate segment failed on %s: %s" % \
2199                        (self.testbed, e))
2200                return False
2201
2202
2203   
2204
2205    def new_allocate_resources(self, allocated, master, eid, expid, expcert, 
2206            tbparams, topo, tmpdir, alloc_log=None, attrs=None):
2207        started = { }           # Testbeds where a sub-experiment started
2208                                # successfully
2209
2210        # XXX
2211        fail_soft = False
2212
2213        log = alloc_log or self.log
2214
2215        thread_pool = self.thread_pool(self.nthreads)
2216        threads = [ ]
2217
2218        for tb in [ k for k in allocated.keys() if k != master]:
2219            # Create and start a thread to start the segment, and save it to
2220            # get the return value later
2221            thread_pool.wait_for_slot()
2222            uri = self.tbmap.get(tb, None)
2223            if not uri:
2224                raise service_error(service_error.internal, 
2225                        "Unknown testbed %s !?" % tb)
2226
2227            if tbparams[tb].has_key('allocID') and \
2228                    tbparams[tb]['allocID'].has_key('fedid'):
2229                aid = tbparams[tb]['allocID']['fedid']
2230            else:
2231                raise service_error(service_error.internal, 
2232                        "No alloc id for testbed %s !?" % tb)
2233
2234            t  = self.pooled_thread(\
2235                    target=self.new_start_segment(log=log, debug=self.debug,
2236                        testbed=tb, cert_file=self.cert_file,
2237                        cert_pwd=self.cert_pwd,
2238                        trusted_certs=self.trusted_certs,
2239                        caller=self.call_StartSegment), 
2240                    args=(uri, aid, topo[tb], False, attrs), name=tb,
2241                    pdata=thread_pool, trace_file=self.trace_file)
2242            threads.append(t)
2243            t.start()
2244
2245        # Wait until all finish
2246        thread_pool.wait_for_all_done()
2247
2248        # If none failed, start the master
2249        failed = [ t.getName() for t in threads if not t.rv ]
2250
2251        if len(failed) == 0:
2252            uri = self.tbmap.get(master, None)
2253            if not uri:
2254                raise service_error(service_error.internal, 
2255                        "Unknown testbed %s !?" % master)
2256
2257            if tbparams[master].has_key('allocID') and \
2258                    tbparams[master]['allocID'].has_key('fedid'):
2259                aid = tbparams[master]['allocID']['fedid']
2260            else:
2261                raise service_error(service_error.internal, 
2262                    "No alloc id for testbed %s !?" % master)
2263            starter = self.new_start_segment(log=log, debug=self.debug,
2264                    testbed=master, cert_file=self.cert_file,
2265                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
2266                    caller=self.call_StartSegment)
2267            if not starter(uri, aid, topo[master], True, attrs):
2268                failed.append(master)
2269
2270        succeeded = [tb for tb in allocated.keys() if tb not in failed]
2271        # If one failed clean up, unless fail_soft is set
2272        if failed and False:
2273            if not fail_soft:
2274                thread_pool.clear()
2275                for tb in succeeded:
2276                    # Create and start a thread to stop the segment
2277                    thread_pool.wait_for_slot()
2278                    t  = self.pooled_thread(\
2279                            target=self.stop_segment(log=log,
2280                                testbed=tb,
2281                                keyfile=self.ssh_privkey_file,
2282                                debug=self.debug), 
2283                            args=(tb, eid, tbparams), name=tb,
2284                            pdata=thread_pool, trace_file=self.trace_file)
2285                    t.start()
2286                # Wait until all finish
2287                thread_pool.wait_for_all_done()
2288
2289                # release the allocations
2290                for tb in tbparams.keys():
2291                    self.release_access(tb, tbparams[tb]['allocID'])
2292                # Remove the placeholder
2293                self.state_lock.acquire()
2294                self.state[eid]['experimentStatus'] = 'failed'
2295                if self.state_filename: self.write_state()
2296                self.state_lock.release()
2297
2298                log.error("Swap in failed on %s" % ",".join(failed))
2299                return
2300        else:
2301            log.info("[start_segment]: Experiment %s active" % eid)
2302
2303        log.debug("[start_experiment]: removing %s" % tmpdir)
2304
2305        # Walk up tmpdir, deleting as we go
2306        for path, dirs, files in os.walk(tmpdir, topdown=False):
2307            for f in files:
2308                os.remove(os.path.join(path, f))
2309            for d in dirs:
2310                os.rmdir(os.path.join(path, d))
2311        os.rmdir(tmpdir)
2312
2313        # Insert the experiment into our state and update the disk copy
2314        self.state_lock.acquire()
2315        self.state[expid]['experimentStatus'] = 'active'
2316        self.state[eid] = self.state[expid]
2317        if self.state_filename: self.write_state()
2318        self.state_lock.release()
2319        return
2320
2321
2322    def new_create_experiment(self, req, fid):
2323        """
2324        The external interface to experiment creation called from the
2325        dispatcher.
2326
2327        Creates a working directory, splits the incoming description using the
2328        splitter script and parses out the avrious subsections using the
2329        lcasses above.  Once each sub-experiment is created, use pooled threads
2330        to instantiate them and start it all up.
2331        """
2332
2333        def add_kit(e, kit):
2334            """
2335            Add a Software object created from the list of (install, location)
2336            tuples passed as kit  to the software attribute of an object e.  We
2337            do this enough to break out the code, but it's kind of a hack to
2338            avoid changing the old tuple rep.
2339            """
2340
2341            s = [ topdl.Software(install=i, location=l) for i, l in kit]
2342
2343            if isinstance(e.software, list): e.software.extend(s)
2344            else: e.software = s
2345
2346
2347        if not self.auth.check_attribute(fid, 'create'):
2348            raise service_error(service_error.access, "Create access denied")
2349
2350        try:
2351            tmpdir = tempfile.mkdtemp(prefix="split-")
2352        except IOError:
2353            raise service_error(service_error.internal, "Cannot create tmp dir")
2354
2355        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2356        gw_secretkey_base = "fed.%s" % self.ssh_type
2357        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2358        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2359        tclfile = tmpdir + "/experiment.tcl"
2360        tbparams = { }
2361        try:
2362            access_user = self.accessdb[fid]
2363        except KeyError:
2364            raise service_error(service_error.internal,
2365                    "Access map and authorizer out of sync in " + \
2366                            "create_experiment for fedid %s"  % fid)
2367
2368        pid = "dummy"
2369        gid = "dummy"
2370        try:
2371            os.mkdir(tmpdir+"/keys")
2372        except OSError:
2373            raise service_error(service_error.internal,
2374                    "Can't make temporary dir")
2375
2376        req = req.get('CreateRequestBody', None)
2377        if not req:
2378            raise service_error(service_error.req,
2379                    "Bad request format (no CreateRequestBody)")
2380        # The tcl parser needs to read a file so put the content into that file
2381        descr=req.get('experimentdescription', None)
2382        if descr:
2383            file_content=descr.get('ns2description', None)
2384            if file_content:
2385                try:
2386                    f = open(tclfile, 'w')
2387                    f.write(file_content)
2388                    f.close()
2389                except IOError:
2390                    raise service_error(service_error.internal,
2391                            "Cannot write temp experiment description")
2392            else:
2393                raise service_error(service_error.req, 
2394                        "Only ns2descriptions supported")
2395        else:
2396            raise service_error(service_error.req, "No experiment description")
2397
2398        # Generate an ID for the experiment (slice) and a certificate that the
2399        # allocator can use to prove they own it.  We'll ship it back through
2400        # the encrypted connection.
2401        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
2402
2403        if req.has_key('experimentID') and \
2404                req['experimentID'].has_key('localname'):
2405            overwrite = False
2406            eid = req['experimentID']['localname']
2407            # If there's an old failed experiment here with the same local name
2408            # and accessible by this user, we'll overwrite it, otherwise we'll
2409            # fall through and do the collision avoidance.
2410            old_expid = self.get_experiment_fedid(eid)
2411            if old_expid and self.check_experiment_access(fid, old_expid):
2412                self.state_lock.acquire()
2413                status = self.state[eid].get('experimentStatus', None)
2414                if status and status == 'failed':
2415                    # remove the old access attribute
2416                    self.auth.unset_attribute(fid, old_expid)
2417                    overwrite = True
2418                    del self.state[eid]
2419                    del self.state[old_expid]
2420                self.state_lock.release()
2421            self.state_lock.acquire()
2422            while (self.state.has_key(eid) and not overwrite):
2423                eid += random.choice(string.ascii_letters)
2424            # Initial state
2425            self.state[eid] = {
2426                    'experimentID' : \
2427                            [ { 'localname' : eid }, {'fedid': expid } ],
2428                    'experimentStatus': 'starting',
2429                    'experimentAccess': { 'X509' : expcert },
2430                    'owner': fid,
2431                    'log' : [],
2432                }
2433            self.state[expid] = self.state[eid]
2434            if self.state_filename: self.write_state()
2435            self.state_lock.release()
2436        else:
2437            eid = self.exp_stem
2438            for i in range(0,5):
2439                eid += random.choice(string.ascii_letters)
2440            self.state_lock.acquire()
2441            while (self.state.has_key(eid)):
2442                eid = self.exp_stem
2443                for i in range(0,5):
2444                    eid += random.choice(string.ascii_letters)
2445            # Initial state
2446            self.state[eid] = {
2447                    'experimentID' : \
2448                            [ { 'localname' : eid }, {'fedid': expid } ],
2449                    'experimentStatus': 'starting',
2450                    'experimentAccess': { 'X509' : expcert },
2451                    'owner': fid,
2452                    'log' : [],
2453                }
2454            self.state[expid] = self.state[eid]
2455            if self.state_filename: self.write_state()
2456            self.state_lock.release()
2457
2458        try: 
2459            # This catches exceptions to clear the placeholder if necessary
2460            try:
2461                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2462            except ValueError:
2463                raise service_error(service_error.server_config, 
2464                        "Bad key type (%s)" % self.ssh_type)
2465
2466            user = req.get('user', None)
2467            if user == None:
2468                raise service_error(service_error.req, "No user")
2469
2470            master = req.get('master', None)
2471            if not master:
2472                raise service_error(service_error.req,
2473                        "No master testbed label")
2474            export_project = req.get('exportProject', None)
2475            if not export_project:
2476                raise service_error(service_error.req, "No export project")
2477           
2478            if self.splitter_url:
2479                self.log.debug("Calling remote splitter at %s" % \
2480                        self.splitter_url)
2481                split_data = self.remote_splitter(self.splitter_url,
2482                        file_content, master)
2483            else:
2484                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2485                    str(self.muxmax), '-m', master]
2486
2487                if self.fedkit:
2488                    tclcmd.append('-k')
2489
2490                if self.gatewaykit:
2491                    tclcmd.append('-K')
2492
2493                tclcmd.extend([pid, gid, eid, tclfile])
2494
2495                self.log.debug("running local splitter %s", " ".join(tclcmd))
2496                # This is just fantastic.  As a side effect the parser copies
2497                # tb_compat.tcl into the current directory, so that directory
2498                # must be writable by the fedd user.  Doing this in the
2499                # temporary subdir ensures this is the case.
2500                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2501                        cwd=tmpdir)
2502                split_data = tclparser.stdout
2503
2504            allocated = { }         # Testbeds we can access
2505            # Allocate IP addresses: The allocator is a buddy system memory
2506            # allocator.  Allocate from the largest substrate to the
2507            # smallest to make the packing more likely to work - i.e.
2508            # avoiding internal fragmentation.
2509            top = topdl.topology_from_xml(file=split_data, top="experiment")
2510            subs = sorted(top.substrates, 
2511                    cmp=lambda x,y: cmp(len(x.interfaces), 
2512                        len(y.interfaces)),
2513                    reverse=True)
2514            ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
2515            ifs = { }
2516            hosts = [ ]
2517            # The config urlpath
2518            configpath = "/%s/config" % expid
2519            # The config file system location
2520            configdir ="%s%s" % ( self.repodir, configpath)
2521
2522            for idx, s in enumerate(subs):
2523                a = ips.allocate(len(s.interfaces)+2)
2524                if a :
2525                    base, num = a
2526                    if num < len(s.interfaces) +2 : 
2527                        raise service_error(service_error.internal,
2528                                "Allocator returned wrong number of IPs??")
2529                else:
2530                    raise service_error(service_error.req, 
2531                            "Cannot allocate IP addresses")
2532
2533                base += 1
2534                for i in s.interfaces:
2535                    i.attribute.append(
2536                            topdl.Attribute('ip4_address', 
2537                                "%s" % ip_addr(base)))
2538                    hname = i.element.name[0]
2539                    if ifs.has_key(hname):
2540                        hosts.append("%s\t%s-%s %s-%d" % \
2541                                (ip_addr(base), hname, s.name, hname,
2542                                    ifs[hname]))
2543                    else:
2544                        ifs[hname] = 0
2545                        hosts.append("%s\t%s-%s %s-%d %s" % \
2546                                (ip_addr(base), hname, s.name, hname,
2547                                    ifs[hname], hname))
2548
2549                    ifs[hname] += 1
2550                    base += 1
2551            # save config files
2552            try:
2553                os.makedirs(configdir)
2554            except IOError, e:
2555                raise service_error(
2556                        "Cannot create config directory: %s" % e)
2557            # Find the testbeds to look up
2558            testbeds = set([ a.value for e in top.elements \
2559                    for a in e.attribute \
2560                        if a.attribute == 'testbed'] )
2561
2562
2563            # Make per testbed topologies.  Copy the main topo and remove
2564            # interfaces and nodes that don't live in the testbed.
2565            topo ={ }
2566            for tb in testbeds:
2567                self.get_access(tb, None, user, tbparams, master,
2568                        export_project, access_user)
2569                allocated[tb] = 1
2570                topo[tb] = top.clone()
2571                to_delete = [ ]
2572                for e in topo[tb].elements:
2573                    etb = e.get_attribute('testbed')
2574                    if etb and etb != tb:
2575                        for i in e.interface:
2576                            for s in i.subs:
2577                                try:
2578                                    s.interfaces.remove(i)
2579                                except ValueError:
2580                                    raise service_error(service_error.internal,
2581                                            "Can't remove interface??")
2582                        to_delete.append(e)
2583                for e in to_delete:
2584                    topo[tb].elements.remove(e)
2585                topo[tb].make_indices()
2586
2587                for e in topo[tb].elements:
2588                    if tb == master:
2589                        cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
2590                    else:
2591                        cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
2592                    scmd = e.get_attribute('startup')
2593                    if scmd:
2594                        cmd = "%s \\$USER '%s'" % (cmd, scmd)
2595
2596                    e.set_attribute('startup', cmd)
2597                    if self.fedkit: add_kit(e, self.fedkit)
2598
2599            # Copy configuration files into the remote file store
2600            try:
2601                f = open("%s/hosts" % configdir, "w")
2602                f.write('\n'.join(hosts))
2603                f.close()
2604            except IOError, e:
2605                raise service_error(service_error.internal, 
2606                        "Cannot write hosts file: %s" % e)
2607            try:
2608                self.copy_file("%s" % gw_pubkey, "%s/%s" % \
2609                        (configdir, gw_pubkey_base))
2610                self.copy_file("%s" % gw_secretkey, "%s/%s" % \
2611                        (configdir, gw_secretkey_base))
2612            except IOError, e:
2613                raise service_error(service_error.internal, 
2614                        "Cannot copy keyfiles: %s" % e)
2615
2616            # Allow the individual testbeds to access the configuration files.
2617            for tb in tbparams.keys():
2618                asignee = tbparams[tb]['allocID']['fedid']
2619                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2620                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2621                    print "assigned %s/%s" % (configpath, f)
2622
2623            # Now, for each substrate in the main topology, find those that
2624            # have nodes on more than one testbed.  Insert portal nodes
2625            # into the copies of those substrates on the sub topologies.
2626            for s in top.substrates:
2627                # tbs will contain an ip address on this subsrate that is in
2628                # each testbed.
2629                tbs = { }
2630                for i in s.interfaces:
2631                    e = i.element
2632                    tb = e.get_attribute('testbed')
2633                    if tb and not tbs.has_key(tb):
2634                        for i in e.interface:
2635                            if s in i.subs:
2636                                tbs[tb]= i.get_attribute('ip4_address')
2637                if len(tbs) < 2:
2638                    continue
2639
2640                # More than one testbed is on this substrate.  Insert
2641                # some portals into the subtopologies.  st == source testbed,
2642                # dt == destination testbed.
2643                segment_substrate = { }
2644                for st in tbs.keys():
2645                    segment_substrate[st] = { }
2646                    for dt in [ t for t in tbs.keys() if t != st]:
2647                        myname =  "%stunnel" % dt
2648                        desthost  =  "%stunnel" % st
2649                        sproject = tbparams[st].get('project', 'project')
2650                        dproject = tbparams[dt].get('project', 'project')
2651                        mproject = tbparams[master].get('project', 'project')
2652                        sdomain = tbparams[st].get('domain', ".example.com")
2653                        ddomain = tbparams[dt].get('domain', ".example.com")
2654                        mdomain = tbparams[master].get('domain', '.example.com')
2655                        muser = tbparams[master].get('user', 'root')
2656                        smbshare = tbparams[master].get('smbshare', 'USERS')
2657                        # XXX: active and type need to be unkludged
2658                        active = ("%s" % (st == master))
2659                        if not segment_substrate[st].has_key(dt):
2660                            # Put a substrate and a segment for the connected
2661                            # testbed in there.
2662                            tsubstrate = \
2663                                    topdl.Substrate(name='%s-%s' % (st, dt),
2664                                            attribute= [
2665                                                topdl.Attribute(
2666                                                    attribute='portal',
2667                                                    value='true')
2668                                                ]
2669                                            )
2670                            segment_element = topdl.Segment(
2671                                    id= tbparams[dt]['allocID'],
2672                                    type='emulab',
2673                                    uri = self.tbmap.get(dt, None),
2674                                    interface=[ 
2675                                        topdl.Interface(
2676                                            substrate=tsubstrate.name),
2677                                        ],
2678                                    attribute = [
2679                                        topdl.Attribute(attribute=n, value=v)
2680                                            for n, v in (\
2681                                                ('domain', ddomain),
2682                                                ('experiment', "%s/%s" % \
2683                                                        (dproject, eid)),)
2684                                        ],
2685                                    )
2686                            segment_substrate[st][dt] = tsubstrate
2687                            topo[st].substrates.append(tsubstrate)
2688                            topo[st].elements.append(segment_element)
2689                        portal = topdl.Computer(
2690                                name="%stunnel" % dt,
2691                                attribute=[ 
2692                                    topdl.Attribute(attribute=n,value=v)
2693                                        for n, v in (\
2694                                            ('portal', 'true'),
2695                                            ('domain', sdomain),
2696                                            ('masterdomain', mdomain),
2697                                            ('masterexperiment', "%s/%s" % \
2698                                                    (mproject, eid)),
2699                                            ('masteruser', muser),
2700                                            ('smbshare', smbshare),
2701                                            ('experiment', "%s/%s" % \
2702                                                    (sproject, eid)),
2703                                            ('peer', "%s" % desthost),
2704                                            ('peer_segment', "%s" % \
2705                                                    tbparams[dt]['allocID']['fedid']),
2706                                            ('scriptdir', 
2707                                                "/usr/local/federation/bin"),
2708                                            ('active', "%s" % active),
2709                                            ('portal_type', 'both'), 
2710                                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), eid.lower(), sproject.lower(), sdomain.lower())))
2711                                    ],
2712                                interface=[
2713                                    topdl.Interface(
2714                                        substrate=s.name,
2715                                        attribute=[ 
2716                                            topdl.Attribute(
2717                                                attribute='ip4_address', 
2718                                                value=tbs[dt]
2719                                            )
2720                                        ]),
2721                                    topdl.Interface(
2722                                        substrate=\
2723                                            segment_substrate[st][dt].name,
2724                                        attribute=[
2725                                            topdl.Attribute(attribute='portal',
2726                                                value='true')
2727                                            ]
2728                                        ),
2729                                    ],
2730                                )
2731                        if self.fedkit: add_kit(portal, self.fedkit)
2732                        if self.gatewaykit: add_kit(portal, self.gatewaykit)
2733
2734                        topo[st].elements.append(portal)
2735
2736            # Connect the gateway nodes into the topologies and clear out
2737            # substrates that are not in the topologies
2738            for tb in testbeds:
2739                topo[tb].incorporate_elements()
2740                topo[tb].substrates = \
2741                        [s for s in topo[tb].substrates \
2742                            if len(s.interfaces) >0]
2743
2744            # Copy the rpms and tarfiles to a distribution directory from
2745            # which the federants can retrieve them
2746            linkpath = "%s/software" %  expid
2747            softdir ="%s/%s" % ( self.repodir, linkpath)
2748            softmap = { }
2749            pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
2750                    for p, t in l ])
2751            pkgs.update([x.location for e in top.elements \
2752                    for x in e.software])
2753            try:
2754                os.makedirs(softdir)
2755            except IOError, e:
2756                raise service_error(
2757                        "Cannot create software directory: %s" % e)
2758            for pkg in pkgs:
2759                loc = pkg
2760
2761                scheme, host, path = urlparse(loc)[0:3]
2762                dest = os.path.basename(path)
2763                if not scheme:
2764                    if not loc.startswith('/'):
2765                        loc = "/%s" % loc
2766                    loc = "file://%s" %loc
2767                try:
2768                    u = urlopen(loc)
2769                except Exception, e:
2770                    raise service_error(service_error.req, 
2771                            "Cannot open %s: %s" % (loc, e))
2772                try:
2773                    f = open("%s/%s" % (softdir, dest) , "w")
2774                    self.log.debug("Writing %s/%s" % (softdir,dest) )
2775                    data = u.read(4096)
2776                    while data:
2777                        f.write(data)
2778                        data = u.read(4096)
2779                    f.close()
2780                    u.close()
2781                except Exception, e:
2782                    raise service_error(service_error.internal,
2783                            "Could not copy %s: %s" % (loc, e))
2784                path = re.sub("/tmp", "", linkpath)
2785                # XXX
2786                softmap[pkg] = \
2787                        "https://users.isi.deterlab.net:23232/%s/%s" %\
2788                        ( path, dest)
2789
2790                # Allow the individual testbeds to access the software.
2791                for tb in tbparams.keys():
2792                    self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
2793                            "/%s/%s" % ( path, dest))
2794
2795            # Convert the software locations in the segments into the local
2796            # copies on this host
2797            for soft in [ s for tb in topo.values() \
2798                    for e in tb.elements \
2799                        if getattr(e, 'software', False) \
2800                            for s in e.software ]:
2801                if softmap.has_key(soft.location):
2802                    soft.location = softmap[soft.location]
2803
2804            vtopo = topdl.topology_to_vtopo(top)
2805            vis = self.genviz(vtopo)
2806
2807            # save federant information
2808            for k in allocated.keys():
2809                tbparams[k]['federant'] = {\
2810                        'name': [ { 'localname' : eid} ],\
2811                        'emulab': tbparams[k]['emulab'],\
2812                        'allocID' : tbparams[k]['allocID'],\
2813                        'master' : k == master,\
2814                    }
2815
2816            self.state_lock.acquire()
2817            self.state[eid]['vtopo'] = vtopo
2818            self.state[eid]['vis'] = vis
2819            self.state[expid]['federant'] = \
2820                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2821                        if tbparams[tb].has_key('federant') ]
2822            if self.state_filename: 
2823                self.write_state()
2824            self.state_lock.release()
2825        except service_error, e:
2826            # If something goes wrong in the parse (usually an access error)
2827            # clear the placeholder state.  From here on out the code delays
2828            # exceptions.  Failing at this point returns a fault to the remote
2829            # caller.
2830
2831            self.state_lock.acquire()
2832            del self.state[eid]
2833            del self.state[expid]
2834            if self.state_filename: self.write_state()
2835            self.state_lock.release()
2836            raise e
2837
2838
2839        # Start the background swapper and return the starting state.  From
2840        # here on out, the state will stick around a while.
2841
2842        # Let users touch the state
2843        self.auth.set_attribute(fid, expid)
2844        self.auth.set_attribute(expid, expid)
2845        # Override fedids can manipulate state as well
2846        for o in self.overrides:
2847            self.auth.set_attribute(o, expid)
2848
2849        # Create a logger that logs to the experiment's state object as well as
2850        # to the main log file.
2851        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2852        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2853        # XXX: there should be a global one of these rather than repeating the
2854        # code.
2855        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2856                    '%d %b %y %H:%M:%S'))
2857        alloc_log.addHandler(h)
2858       
2859        # XXX
2860        url_base = 'https://users.isi.deterlab.net:23232'
2861        attrs = [ 
2862                {
2863                    'attribute': 'ssh_pubkey', 
2864                    'value': '%s/%s/config/%s' % \
2865                            (url_base, expid, gw_pubkey_base)
2866                },
2867                {
2868                    'attribute': 'ssh_secretkey', 
2869                    'value': '%s/%s/config/%s' % \
2870                            (url_base, expid, gw_secretkey_base)
2871                },
2872                {
2873                    'attribute': 'hosts', 
2874                    'value': '%s/%s/config/hosts' % \
2875                            (url_base, expid)
2876                },
2877                {
2878                    'attribute': 'experiment_name',
2879                    'value': eid,
2880                },
2881            ]
2882
2883        # Start a thread to do the resource allocation
2884        t  = Thread(target=self.new_allocate_resources,
2885                args=(allocated, master, eid, expid, expcert, tbparams, 
2886                    topo, tmpdir, alloc_log, attrs),
2887                name=eid)
2888        t.start()
2889
2890        rv = {
2891                'experimentID': [
2892                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2893                ],
2894                'experimentStatus': 'starting',
2895                'experimentAccess': { 'X509' : expcert }
2896            }
2897
2898        return rv
2899   
2900    def get_experiment_fedid(self, key):
2901        """
2902        find the fedid associated with the localname key in the state database.
2903        """
2904
2905        rv = None
2906        self.state_lock.acquire()
2907        if self.state.has_key(key):
2908            if isinstance(self.state[key], dict):
2909                try:
2910                    kl = [ f['fedid'] for f in \
2911                            self.state[key]['experimentID']\
2912                                if f.has_key('fedid') ]
2913                except KeyError:
2914                    self.state_lock.release()
2915                    raise service_error(service_error.internal, 
2916                            "No fedid for experiment %s when getting "+\
2917                                    "fedid(!?)" % key)
2918                if len(kl) == 1:
2919                    rv = kl[0]
2920                else:
2921                    self.state_lock.release()
2922                    raise service_error(service_error.internal, 
2923                            "multiple fedids for experiment %s when " +\
2924                                    "getting fedid(!?)" % key)
2925            else:
2926                self.state_lock.release()
2927                raise service_error(service_error.internal, 
2928                        "Unexpected state for %s" % key)
2929        self.state_lock.release()
2930        return rv
2931
2932    def check_experiment_access(self, fid, key):
2933        """
2934        Confirm that the fid has access to the experiment.  Though a request
2935        may be made in terms of a local name, the access attribute is always
2936        the experiment's fedid.
2937        """
2938        if not isinstance(key, fedid):
2939            key = self.get_experiment_fedid(key)
2940
2941        if self.auth.check_attribute(fid, key):
2942            return True
2943        else:
2944            raise service_error(service_error.access, "Access Denied")
2945
2946
2947    def get_handler(self, path, fid):
2948        print "%s" %  path
2949        if self.auth.check_attribute(fid, path):
2950            return ("%s/%s" % (self.repodir, path), "application/binary")
2951        else:
2952            return (None, None)
2953
2954    def get_vtopo(self, req, fid):
2955        """
2956        Return the stored virtual topology for this experiment
2957        """
2958        rv = None
2959        state = None
2960
2961        req = req.get('VtopoRequestBody', None)
2962        if not req:
2963            raise service_error(service_error.req,
2964                    "Bad request format (no VtopoRequestBody)")
2965        exp = req.get('experiment', None)
2966        if exp:
2967            if exp.has_key('fedid'):
2968                key = exp['fedid']
2969                keytype = "fedid"
2970            elif exp.has_key('localname'):
2971                key = exp['localname']
2972                keytype = "localname"
2973            else:
2974                raise service_error(service_error.req, "Unknown lookup type")
2975        else:
2976            raise service_error(service_error.req, "No request?")
2977
2978        self.check_experiment_access(fid, key)
2979
2980        self.state_lock.acquire()
2981        if self.state.has_key(key):
2982            if self.state[key].has_key('vtopo'):
2983                rv = { 'experiment' : {keytype: key },\
2984                        'vtopo': self.state[key]['vtopo'],\
2985                    }
2986            else:
2987                state = self.state[key]['experimentStatus']
2988        self.state_lock.release()
2989
2990        if rv: return rv
2991        else: 
2992            if state:
2993                raise service_error(service_error.partial, 
2994                        "Not ready: %s" % state)
2995            else:
2996                raise service_error(service_error.req, "No such experiment")
2997
2998    def get_vis(self, req, fid):
2999        """
3000        Return the stored visualization for this experiment
3001        """
3002        rv = None
3003        state = None
3004
3005        req = req.get('VisRequestBody', None)
3006        if not req:
3007            raise service_error(service_error.req,
3008                    "Bad request format (no VisRequestBody)")
3009        exp = req.get('experiment', None)
3010        if exp:
3011            if exp.has_key('fedid'):
3012                key = exp['fedid']
3013                keytype = "fedid"
3014            elif exp.has_key('localname'):
3015                key = exp['localname']
3016                keytype = "localname"
3017            else:
3018                raise service_error(service_error.req, "Unknown lookup type")
3019        else:
3020            raise service_error(service_error.req, "No request?")
3021
3022        self.check_experiment_access(fid, key)
3023
3024        self.state_lock.acquire()
3025        if self.state.has_key(key):
3026            if self.state[key].has_key('vis'):
3027                rv =  { 'experiment' : {keytype: key },\
3028                        'vis': self.state[key]['vis'],\
3029                        }
3030            else:
3031                state = self.state[key]['experimentStatus']
3032        self.state_lock.release()
3033
3034        if rv: return rv
3035        else:
3036            if state:
3037                raise service_error(service_error.partial, 
3038                        "Not ready: %s" % state)
3039            else:
3040                raise service_error(service_error.req, "No such experiment")
3041
3042    def clean_info_response(self, rv):
3043        """
3044        Remove the information in the experiment's state object that is not in
3045        the info response.
3046        """
3047        # Remove the owner info (should always be there, but...)
3048        if rv.has_key('owner'): del rv['owner']
3049
3050        # Convert the log into the allocationLog parameter and remove the
3051        # log entry (with defensive programming)
3052        if rv.has_key('log'):
3053            rv['allocationLog'] = "".join(rv['log'])
3054            del rv['log']
3055        else:
3056            rv['allocationLog'] = ""
3057
3058        if rv['experimentStatus'] != 'active':
3059            if rv.has_key('federant'): del rv['federant']
3060        else:
3061            # remove the allocationID info from each federant
3062            for f in rv.get('federant', []):
3063                if f.has_key('allocID'): del f['allocID']
3064        return rv
3065
3066    def get_info(self, req, fid):
3067        """
3068        Return all the stored info about this experiment
3069        """
3070        rv = None
3071
3072        req = req.get('InfoRequestBody', None)
3073        if not req:
3074            raise service_error(service_error.req,
3075                    "Bad request format (no InfoRequestBody)")
3076        exp = req.get('experiment', None)
3077        if exp:
3078            if exp.has_key('fedid'):
3079                key = exp['fedid']
3080                keytype = "fedid"
3081            elif exp.has_key('localname'):
3082                key = exp['localname']
3083                keytype = "localname"
3084            else:
3085                raise service_error(service_error.req, "Unknown lookup type")
3086        else:
3087            raise service_error(service_error.req, "No request?")
3088
3089        self.check_experiment_access(fid, key)
3090
3091        # The state may be massaged by the service function that called
3092        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
3093        # state.
3094        self.state_lock.acquire()
3095        if self.state.has_key(key):
3096            rv = copy.deepcopy(self.state[key])
3097        self.state_lock.release()
3098
3099        if rv:
3100            return self.clean_info_response(rv)
3101        else:
3102            raise service_error(service_error.req, "No such experiment")
3103
3104    def get_multi_info(self, req, fid):
3105        """
3106        Return all the stored info that this fedid can access
3107        """
3108        rv = { 'info': [ ] }
3109
3110        self.state_lock.acquire()
3111        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
3112            self.check_experiment_access(fid, key)
3113
3114            if self.state.has_key(key):
3115                e = copy.deepcopy(self.state[key])
3116                e = self.clean_info_response(e)
3117                rv['info'].append(e)
3118        self.state_lock.release()
3119        return rv
3120
3121
3122    def terminate_experiment(self, req, fid):
3123        """
3124        Swap this experiment out on the federants and delete the shared
3125        information
3126        """
3127        tbparams = { }
3128        req = req.get('TerminateRequestBody', None)
3129        if not req:
3130            raise service_error(service_error.req,
3131                    "Bad request format (no TerminateRequestBody)")
3132        force = req.get('force', False)
3133        exp = req.get('experiment', None)
3134        if exp:
3135            if exp.has_key('fedid'):
3136                key = exp['fedid']
3137                keytype = "fedid"
3138            elif exp.has_key('localname'):
3139                key = exp['localname']
3140                keytype = "localname"
3141            else:
3142                raise service_error(service_error.req, "Unknown lookup type")
3143        else:
3144            raise service_error(service_error.req, "No request?")
3145
3146        self.check_experiment_access(fid, key)
3147
3148        dealloc_list = [ ]
3149
3150
3151        # Create a logger that logs to the dealloc_list as well as to the main
3152        # log file.
3153        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
3154        h = logging.StreamHandler(self.list_log(dealloc_list))
3155        # XXX: there should be a global one of these rather than repeating the
3156        # code.
3157        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
3158                    '%d %b %y %H:%M:%S'))
3159        dealloc_log.addHandler(h)
3160
3161        self.state_lock.acquire()
3162        fed_exp = self.state.get(key, None)
3163
3164        if fed_exp:
3165            # This branch of the conditional holds the lock to generate a
3166            # consistent temporary tbparams variable to deallocate experiments.
3167            # It releases the lock to do the deallocations and reacquires it to
3168            # remove the experiment state when the termination is complete.
3169
3170            # First make sure that the experiment creation is complete.
3171            status = fed_exp.get('experimentStatus', None)
3172
3173            if status:
3174                if status in ('starting', 'terminating'):
3175                    if not force:
3176                        self.state_lock.release()
3177                        raise service_error(service_error.partial, 
3178                                'Experiment still being created or destroyed')
3179                    else:
3180                        self.log.warning('Experiment in %s state ' % status + \
3181                                'being terminated by force.')
3182            else:
3183                # No status??? trouble
3184                self.state_lock.release()
3185                raise service_error(service_error.internal,
3186                        "Experiment has no status!?")
3187
3188            ids = []
3189            #  experimentID is a list of dicts that are self-describing
3190            #  identifiers.  This finds all the fedids and localnames - the
3191            #  keys of self.state - and puts them into ids.
3192            for id in fed_exp.get('experimentID', []):
3193                if id.has_key('fedid'): ids.append(id['fedid'])
3194                if id.has_key('localname'): ids.append(id['localname'])
3195
3196            # Construct enough of the tbparams to make the stop_segment calls
3197            # work
3198            for fed in fed_exp.get('federant', []):
3199                try:
3200                    for e in fed['name']:
3201                        eid = e.get('localname', None)
3202                        if eid: break
3203                    else:
3204                        continue
3205
3206                    p = fed['emulab']['project']
3207
3208                    project = p['name']['localname']
3209                    tb = p['testbed']['localname']
3210                    user = p['user'][0]['userID']['localname']
3211
3212                    domain = fed['emulab']['domain']
3213                    host  = fed['emulab']['ops']
3214                    aid = fed['allocID']
3215                except KeyError, e:
3216                    continue
3217                tbparams[tb] = {\
3218                        'user': user,\
3219                        'domain': domain,\
3220                        'project': project,\
3221                        'host': host,\
3222                        'eid': eid,\
3223                        'aid': aid,\
3224                    }
3225            fed_exp['experimentStatus'] = 'terminating'
3226            if self.state_filename: self.write_state()
3227            self.state_lock.release()
3228
3229            # Stop everyone.  NB, wait_for_all waits until a thread starts and
3230            # then completes, so we can't wait if nothing starts.  So, no
3231            # tbparams, no start.
3232            if len(tbparams) > 0:
3233                thread_pool = self.thread_pool(self.nthreads)
3234                for tb in tbparams.keys():
3235                    # Create and start a thread to stop the segment
3236                    thread_pool.wait_for_slot()
3237                    t  = self.pooled_thread(\
3238                            target=self.stop_segment(log=dealloc_log,
3239                                keyfile=self.ssh_privkey_file, debug=self.debug), 
3240                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
3241                            pdata=thread_pool, trace_file=self.trace_file)
3242                    t.start()
3243                # Wait for completions
3244                thread_pool.wait_for_all_done()
3245
3246            # release the allocations (failed experiments have done this
3247            # already, and starting experiments may be in odd states, so we
3248            # ignore errors releasing those allocations
3249            try: 
3250                for tb in tbparams.keys():
3251                    self.release_access(tb, tbparams[tb]['aid'])
3252            except service_error, e:
3253                if status != 'failed' and not force:
3254                    raise e
3255
3256            # Remove the terminated experiment
3257            self.state_lock.acquire()
3258            for id in ids:
3259                if self.state.has_key(id): del self.state[id]
3260
3261            if self.state_filename: self.write_state()
3262            self.state_lock.release()
3263
3264            return { 
3265                    'experiment': exp , 
3266                    'deallocationLog': "".join(dealloc_list),
3267                    }
3268        else:
3269            # Don't forget to release the lock
3270            self.state_lock.release()
3271            raise service_error(service_error.req, "No saved state")
3272
3273    def new_terminate_experiment(self, req, fid):
3274        """
3275        Swap this experiment out on the federants and delete the shared
3276        information
3277        """
3278        tbparams = { }
3279        req = req.get('TerminateRequestBody', None)
3280        if not req:
3281            raise service_error(service_error.req,
3282                    "Bad request format (no TerminateRequestBody)")
3283        force = req.get('force', False)
3284        exp = req.get('experiment', None)
3285        if exp:
3286            if exp.has_key('fedid'):
3287                key = exp['fedid']
3288                keytype = "fedid"
3289            elif exp.has_key('localname'):
3290                key = exp['localname']
3291                keytype = "localname"
3292            else:
3293                raise service_error(service_error.req, "Unknown lookup type")
3294        else:
3295            raise service_error(service_error.req, "No request?")
3296
3297        self.check_experiment_access(fid, key)
3298
3299        dealloc_list = [ ]
3300
3301
3302        # Create a logger that logs to the dealloc_list as well as to the main
3303        # log file.
3304        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
3305        h = logging.StreamHandler(self.list_log(dealloc_list))
3306        # XXX: there should be a global one of these rather than repeating the
3307        # code.
3308        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
3309                    '%d %b %y %H:%M:%S'))
3310        dealloc_log.addHandler(h)
3311
3312        self.state_lock.acquire()
3313        fed_exp = self.state.get(key, None)
3314
3315        if fed_exp:
3316            # This branch of the conditional holds the lock to generate a
3317            # consistent temporary tbparams variable to deallocate experiments.
3318            # It releases the lock to do the deallocations and reacquires it to
3319            # remove the experiment state when the termination is complete.
3320
3321            # First make sure that the experiment creation is complete.
3322            status = fed_exp.get('experimentStatus', None)
3323
3324            if status:
3325                if status in ('starting', 'terminating'):
3326                    if not force:
3327                        self.state_lock.release()
3328                        raise service_error(service_error.partial, 
3329                                'Experiment still being created or destroyed')
3330                    else:
3331                        self.log.warning('Experiment in %s state ' % status + \
3332                                'being terminated by force.')
3333            else:
3334                # No status??? trouble
3335                self.state_lock.release()
3336                raise service_error(service_error.internal,
3337                        "Experiment has no status!?")
3338
3339            ids = []
3340            #  experimentID is a list of dicts that are self-describing
3341            #  identifiers.  This finds all the fedids and localnames - the
3342            #  keys of self.state - and puts them into ids.
3343            for id in fed_exp.get('experimentID', []):
3344                if id.has_key('fedid'): ids.append(id['fedid'])
3345                if id.has_key('localname'): ids.append(id['localname'])
3346
3347            # Collect the allocation/segment ids
3348            for fed in fed_exp.get('federant', []):
3349                try:
3350                    print "looking at %s" % fed
3351                    tb = fed['emulab']['project']['testbed']['localname']
3352                    aid = fed['allocID']
3353                except KeyError, e:
3354                    print "Key error: %s" %e
3355                    continue
3356                tbparams[tb] = aid
3357            fed_exp['experimentStatus'] = 'terminating'
3358            if self.state_filename: self.write_state()
3359            self.state_lock.release()
3360
3361            # Stop everyone.  NB, wait_for_all waits until a thread starts and
3362            # then completes, so we can't wait if nothing starts.  So, no
3363            # tbparams, no start.
3364            if len(tbparams) > 0:
3365                thread_pool = self.thread_pool(self.nthreads)
3366                for tb in tbparams.keys():
3367                    # Create and start a thread to stop the segment
3368                    thread_pool.wait_for_slot()
3369                    uri = self.tbmap.get(tb, None)
3370                    t  = self.pooled_thread(\
3371                            target=self.new_terminate_segment(log=dealloc_log,
3372                                testbed=tb,
3373                                cert_file=self.cert_file, 
3374                                cert_pwd=self.cert_pwd,
3375                                trusted_certs=self.trusted_certs,
3376                                caller=self.call_TerminateSegment),
3377                            args=(uri, tbparams[tb]), name=tb,
3378                            pdata=thread_pool, trace_file=self.trace_file)
3379                    t.start()
3380                # Wait for completions
3381                thread_pool.wait_for_all_done()
3382
3383            # release the allocations (failed experiments have done this
3384            # already, and starting experiments may be in odd states, so we
3385            # ignore errors releasing those allocations
3386            try: 
3387                for tb in tbparams.keys():
3388                    self.release_access(tb, tbparams[tb])
3389            except service_error, e:
3390                if status != 'failed' and not force:
3391                    raise e
3392
3393            # Remove the terminated experiment
3394            self.state_lock.acquire()
3395            for id in ids:
3396                if self.state.has_key(id): del self.state[id]
3397
3398            if self.state_filename: self.write_state()
3399            self.state_lock.release()
3400
3401            return { 
3402                    'experiment': exp , 
3403                    'deallocationLog': "".join(dealloc_list),
3404                    }
3405        else:
3406            # Don't forget to release the lock
3407            self.state_lock.release()
3408            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.