source: fedd/federation/experiment_control.py @ c49d81b

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since c49d81b was db6b092, checked in by Ted Faber <faber@…>, 15 years ago

Checkpoint (and whitespace in experiment_control)

  • Property mode set to 100644
File size: 121.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32from ip_allocator import ip_allocator
33from ip_addr import ip_addr
34
35
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.experiment_control")
40fl.addHandler(nullHandler())
41
42class experiment_control_local:
43    """
44    Control of experiments that this system can directly access.
45
46    Includes experiment creation, termination and information dissemination.
47    Thred safe.
48    """
49
50    class ssh_cmd_timeout(RuntimeError): pass
51
52    class list_log:
53        """
54        Provide an interface that lets logger.StreamHandler s write to a list
55        of strings.
56        """
57        def __init__(self, l=[]):
58            """
59            Link to an existing list or just create a log
60            """
61            self.ll = l
62            self.lock = Lock()
63        def write(self, str):
64            """
65            Add the string to the log.  Lock for consistency.
66            """
67            self.lock.acquire()
68            self.ll.append(str)
69            self.lock.release()
70
71        def flush(self):
72            """
73            No-op that StreamHandlers expect
74            """
75            pass
76
77   
78    class thread_pool:
79        """
80        A class to keep track of a set of threads all invoked for the same
81        task.  Manages the mutual exclusion of the states.
82        """
83        def __init__(self, nthreads):
84            """
85            Start a pool.
86            """
87            self.changed = Condition()
88            self.started = 0
89            self.terminated = 0
90            self.nthreads = nthreads
91
92        def acquire(self):
93            """
94            Get the pool's lock.
95            """
96            self.changed.acquire()
97
98        def release(self):
99            """
100            Release the pool's lock.
101            """
102            self.changed.release()
103
104        def wait(self, timeout = None):
105            """
106            Wait for a pool thread to start or stop.
107            """
108            self.changed.wait(timeout)
109
110        def start(self):
111            """
112            Called by a pool thread to report starting.
113            """
114            self.changed.acquire()
115            self.started += 1
116            self.changed.notifyAll()
117            self.changed.release()
118
119        def terminate(self):
120            """
121            Called by a pool thread to report finishing.
122            """
123            self.changed.acquire()
124            self.terminated += 1
125            self.changed.notifyAll()
126            self.changed.release()
127
128        def clear(self):
129            """
130            Clear all pool data.
131            """
132            self.changed.acquire()
133            self.started = 0
134            self.terminated =0
135            self.changed.notifyAll()
136            self.changed.release()
137
138        def wait_for_slot(self):
139            """
140            Wait until we have a free slot to start another pooled thread
141            """
142            self.acquire()
143            while self.started - self.terminated >= self.nthreads:
144                self.wait()
145            self.release()
146
147        def wait_for_all_done(self):
148            """
149            Wait until all active threads finish (and at least one has started)
150            """
151            self.acquire()
152            while self.started == 0 or self.started > self.terminated:
153                self.wait()
154            self.release()
155
156    class pooled_thread(Thread):
157        """
158        One of a set of threads dedicated to a specific task.  Uses the
159        thread_pool class above for coordination.
160        """
161        def __init__(self, group=None, target=None, name=None, args=(), 
162                kwargs={}, pdata=None, trace_file=None):
163            Thread.__init__(self, group, target, name, args, kwargs)
164            self.rv = None          # Return value of the ops in this thread
165            self.exception = None   # Exception that terminated this thread
166            self.target=target      # Target function to run on start()
167            self.args = args        # Args to pass to target
168            self.kwargs = kwargs    # Additional kw args
169            self.pdata = pdata      # thread_pool for this class
170            # Logger for this thread
171            self.log = logging.getLogger("fedd.experiment_control")
172       
173        def run(self):
174            """
175            Emulate Thread.run, except add pool data manipulation and error
176            logging.
177            """
178            if self.pdata:
179                self.pdata.start()
180
181            if self.target:
182                try:
183                    self.rv = self.target(*self.args, **self.kwargs)
184                except service_error, s:
185                    self.exception = s
186                    self.log.error("Thread exception: %s %s" % \
187                            (s.code_string(), s.desc))
188                except:
189                    self.exception = sys.exc_info()[1]
190                    self.log.error(("Unexpected thread exception: %s" +\
191                            "Trace %s") % (self.exception,\
192                                traceback.format_exc()))
193            if self.pdata:
194                self.pdata.terminate()
195
196    call_RequestAccess = service_caller('RequestAccess')
197    call_ReleaseAccess = service_caller('ReleaseAccess')
198    call_Ns2Split = service_caller('Ns2Split')
199
200    def __init__(self, config=None, auth=None):
201        """
202        Intialize the various attributes, most from the config object
203        """
204
205        def parse_tarfile_list(tf):
206            """
207            Parse a tarfile list from the configuration.  This is a set of
208            paths and tarfiles separated by spaces.
209            """
210            rv = [ ]
211            if tf is not None:
212                tl = tf.split()
213                while len(tl) > 1:
214                    p, t = tl[0:2]
215                    del tl[0:2]
216                    rv.append((p, t))
217            return rv
218
219        self.thread_with_rv = experiment_control_local.pooled_thread
220        self.thread_pool = experiment_control_local.thread_pool
221        self.list_log = experiment_control_local.list_log
222
223        self.cert_file = config.get("experiment_control", "cert_file")
224        if self.cert_file:
225            self.cert_pwd = config.get("experiment_control", "cert_pwd")
226        else:
227            self.cert_file = config.get("globals", "cert_file")
228            self.cert_pwd = config.get("globals", "cert_pwd")
229
230        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
231                or config.get("globals", "trusted_certs")
232
233        self.exp_stem = "fed-stem"
234        self.log = logging.getLogger("fedd.experiment_control")
235        set_log_level(config, "experiment_control", self.log)
236        self.muxmax = 2
237        self.nthreads = 2
238        self.randomize_experiments = False
239
240        self.splitter = None
241        self.ssh_keygen = "/usr/bin/ssh-keygen"
242        self.ssh_identity_file = None
243
244
245        self.debug = config.getboolean("experiment_control", "create_debug")
246        self.state_filename = config.get("experiment_control", 
247                "experiment_state")
248        self.splitter_url = config.get("experiment_control", "splitter_uri")
249        self.fedkit = parse_tarfile_list(\
250                config.get("experiment_control", "fedkit"))
251        self.gatewaykit = parse_tarfile_list(\
252                config.get("experiment_control", "gatewaykit"))
253        accessdb_file = config.get("experiment_control", "accessdb")
254
255        self.ssh_pubkey_file = config.get("experiment_control", 
256                "ssh_pubkey_file")
257        self.ssh_privkey_file = config.get("experiment_control",
258                "ssh_privkey_file")
259        # NB for internal master/slave ops, not experiment setup
260        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
261
262        self.overrides = set([])
263        ovr = config.get('experiment_control', 'overrides')
264        if ovr:
265            for o in ovr.split(","):
266                o = o.strip()
267                if o.startswith('fedid:'): o = o[len('fedid:'):]
268                self.overrides.add(fedid(hexstr=o))
269
270        self.state = { }
271        self.state_lock = Lock()
272        self.tclsh = "/usr/local/bin/otclsh"
273        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
274                config.get("experiment_control", "tcl_splitter",
275                        "/usr/testbed/lib/ns2ir/parse.tcl")
276        mapdb_file = config.get("experiment_control", "mapdb")
277        self.trace_file = sys.stderr
278
279        self.def_expstart = \
280                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
281                "/tmp/federate";
282        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
283                "FEDDIR/hosts";
284        self.def_gwstart = \
285                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
286                "/tmp/bridge.log";
287        self.def_mgwstart = \
288                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
289                "/tmp/bridge.log";
290        self.def_gwimage = "FBSD61-TUNNEL2";
291        self.def_gwtype = "pc";
292        self.local_access = { }
293
294        if auth:
295            self.auth = auth
296        else:
297            self.log.error(\
298                    "[access]: No authorizer initialized, creating local one.")
299            auth = authorizer()
300
301
302        if self.ssh_pubkey_file:
303            try:
304                f = open(self.ssh_pubkey_file, 'r')
305                self.ssh_pubkey = f.read()
306                f.close()
307            except IOError:
308                raise service_error(service_error.internal,
309                        "Cannot read sshpubkey")
310        else:
311            raise service_error(service_error.internal, 
312                    "No SSH public key file?")
313
314        if not self.ssh_privkey_file:
315            raise service_error(service_error.internal, 
316                    "No SSH public key file?")
317
318
319        if mapdb_file:
320            self.read_mapdb(mapdb_file)
321        else:
322            self.log.warn("[experiment_control] No testbed map, using defaults")
323            self.tbmap = { 
324                    'deter':'https://users.isi.deterlab.net:23235',
325                    'emulab':'https://users.isi.deterlab.net:23236',
326                    'ucb':'https://users.isi.deterlab.net:23237',
327                    }
328
329        if accessdb_file:
330                self.read_accessdb(accessdb_file)
331        else:
332            raise service_error(service_error.internal,
333                    "No accessdb specified in config")
334
335        # Grab saved state.  OK to do this w/o locking because it's read only
336        # and only one thread should be in existence that can see self.state at
337        # this point.
338        if self.state_filename:
339            self.read_state()
340
341        # Dispatch tables
342        self.soap_services = {\
343                'Create': soap_handler('Create', self.new_create_experiment),
344                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
345                'Vis': soap_handler('Vis', self.get_vis),
346                'Info': soap_handler('Info', self.get_info),
347                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
348                'Terminate': soap_handler('Terminate', 
349                    self.terminate_experiment),
350        }
351
352        self.xmlrpc_services = {\
353                'Create': xmlrpc_handler('Create', self.new_create_experiment),
354                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
355                'Vis': xmlrpc_handler('Vis', self.get_vis),
356                'Info': xmlrpc_handler('Info', self.get_info),
357                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
358                'Terminate': xmlrpc_handler('Terminate',
359                    self.terminate_experiment),
360        }
361
362    def copy_file(self, src, dest, size=1024):
363        """
364        Exceedingly simple file copy.
365        """
366        s = open(src,'r')
367        d = open(dest, 'w')
368
369        buf = "x"
370        while buf != "":
371            buf = s.read(size)
372            d.write(buf)
373        s.close()
374        d.close()
375
376    # Call while holding self.state_lock
377    def write_state(self):
378        """
379        Write a new copy of experiment state after copying the existing state
380        to a backup.
381
382        State format is a simple pickling of the state dictionary.
383        """
384        if os.access(self.state_filename, os.W_OK):
385            self.copy_file(self.state_filename, \
386                    "%s.bak" % self.state_filename)
387        try:
388            f = open(self.state_filename, 'w')
389            pickle.dump(self.state, f)
390        except IOError, e:
391            self.log.error("Can't write file %s: %s" % \
392                    (self.state_filename, e))
393        except pickle.PicklingError, e:
394            self.log.error("Pickling problem: %s" % e)
395        except TypeError, e:
396            self.log.error("Pickling problem (TypeError): %s" % e)
397
398    # Call while holding self.state_lock
399    def read_state(self):
400        """
401        Read a new copy of experiment state.  Old state is overwritten.
402
403        State format is a simple pickling of the state dictionary.
404        """
405        try:
406            f = open(self.state_filename, "r")
407            self.state = pickle.load(f)
408            self.log.debug("[read_state]: Read state from %s" % \
409                    self.state_filename)
410        except IOError, e:
411            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
412                    % (self.state_filename, e))
413        except pickle.UnpicklingError, e:
414            self.log.warning(("[read_state]: No saved state: " + \
415                    "Unpickling failed: %s") % e)
416       
417        for k in self.state.keys():
418            try:
419                # This list should only have one element in it, but phrasing it
420                # as a for loop doesn't cost much, really.  We have to find the
421                # fedid elements anyway.
422                for eid in [ f['fedid'] \
423                        for f in self.state[k]['experimentID']\
424                            if f.has_key('fedid') ]:
425                    self.auth.set_attribute(self.state[k]['owner'], eid)
426                    # allow overrides to control experiments as well
427                    for o in self.overrides:
428                        self.auth.set_attribute(o, eid)
429            except KeyError, e:
430                self.log.warning("[read_state]: State ownership or identity " +\
431                        "misformatted in %s: %s" % (self.state_filename, e))
432
433
434    def read_accessdb(self, accessdb_file):
435        """
436        Read the mapping from fedids that can create experiments to their name
437        in the 3-level access namespace.  All will be asserted from this
438        testbed and can include the local username and porject that will be
439        asserted on their behalf by this fedd.  Each fedid is also added to the
440        authorization system with the "create" attribute.
441        """
442        self.accessdb = {}
443        # These are the regexps for parsing the db
444        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
445        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
446                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
447        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
448                "\s*->\s*(" + name_expr + ")\s*$")
449        lineno = 0
450
451        # Parse the mappings and store in self.authdb, a dict of
452        # fedid -> (proj, user)
453        try:
454            f = open(accessdb_file, "r")
455            for line in f:
456                lineno += 1
457                line = line.strip()
458                if len(line) == 0 or line.startswith('#'):
459                    continue
460                m = project_line.match(line)
461                if m:
462                    fid = fedid(hexstr=m.group(1))
463                    project, user = m.group(2,3)
464                    if not self.accessdb.has_key(fid):
465                        self.accessdb[fid] = []
466                    self.accessdb[fid].append((project, user))
467                    continue
468
469                m = user_line.match(line)
470                if m:
471                    fid = fedid(hexstr=m.group(1))
472                    project = None
473                    user = m.group(2)
474                    if not self.accessdb.has_key(fid):
475                        self.accessdb[fid] = []
476                    self.accessdb[fid].append((project, user))
477                    continue
478                self.log.warn("[experiment_control] Error parsing access " +\
479                        "db %s at line %d" %  (accessdb_file, lineno))
480        except IOError:
481            raise service_error(service_error.internal,
482                    "Error opening/reading %s as experiment " +\
483                            "control accessdb" %  accessdb_file)
484        f.close()
485
486        # Initialize the authorization attributes
487        for fid in self.accessdb.keys():
488            self.auth.set_attribute(fid, 'create')
489
490    def read_mapdb(self, file):
491        """
492        Read a simple colon separated list of mappings for the
493        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
494        """
495
496        self.tbmap = { }
497        lineno =0
498        try:
499            f = open(file, "r")
500            for line in f:
501                lineno += 1
502                line = line.strip()
503                if line.startswith('#') or len(line) == 0:
504                    continue
505                try:
506                    label, url = line.split(':', 1)
507                    self.tbmap[label] = url
508                except ValueError, e:
509                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
510                            "map db: %s %s" % (lineno, line, e))
511        except IOError, e:
512            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
513                    "open %s: %s" % (file, e))
514        f.close()
515
516    class emulab_segment:
517        def __init__(self, log=None, keyfile=None, debug=False):
518            self.log = log or logging.getLogger(\
519                    'fedd.experiment_control.emulab_segment')
520            self.ssh_privkey_file = keyfile
521            self.debug = debug
522            self.ssh_exec="/usr/bin/ssh"
523            self.scp_exec = "/usr/bin/scp"
524            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
525
526        def scp_file(self, file, user, host, dest=""):
527            """
528            scp a file to the remote host.  If debug is set the action is only
529            logged.
530            """
531
532            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
533                    '-o', 'StrictHostKeyChecking yes', '-i', 
534                    self.ssh_privkey_file, file, 
535                    "%s@%s:%s" % (user, host, dest)]
536            rv = 0
537
538            try:
539                dnull = open("/dev/null", "w")
540            except IOError:
541                self.log.debug("[ssh_file]: failed to open " + \
542                        "/dev/null for redirect")
543                dnull = Null
544
545            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
546            if not self.debug:
547                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
548                        close_fds=True)
549
550            return rv == 0
551
552        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
553            """
554            Run a remote command on host as user.  If debug is set, the action
555            is only logged.  Commands are run without stdin, to avoid stray
556            SIGTTINs.
557            """
558            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
559                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
560                    (self.ssh_exec, self.ssh_privkey_file, 
561                            user, host, cmd)
562
563            try:
564                dnull = open("/dev/null", "w")
565            except IOError:
566                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
567                        "for redirect")
568                dnull = Null
569
570            self.log.debug("[ssh_cmd]: %s" % sh_str)
571            if not self.debug:
572                if dnull:
573                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
574                            close_fds=True)
575                else:
576                    sub = Popen(sh_str, shell=True,
577                            close_fds=True)
578                if timeout:
579                    i = 0
580                    rv = sub.poll()
581                    while i < timeout:
582                        if rv is not None: break
583                        else:
584                            time.sleep(1)
585                            rv = sub.poll()
586                            i += 1
587                    else:
588                        self.log.debug("Process exceeded runtime: %s" % sh_str)
589                        os.kill(sub.pid, signal.SIGKILL)
590                        raise self.ssh_cmd_timeout();
591                    return rv == 0
592                else:
593                    return sub.wait() == 0
594            else:
595                if timeout == 0:
596                    self.log.debug("debug timeout raised on %s " % sh_str)
597                    raise self.ssh_cmd_timeout()
598                else:
599                    return True
600
601    class start_segment(emulab_segment):
602        def __init__(self, log=None, keyfile=None, debug=False):
603            experiment_control_local.emulab_segment.__init__(self,
604                    log=log, keyfile=keyfile, debug=debug)
605
606        def create_config_tree(self, src_dir, dest_dir, script):
607            """
608            Append commands to script that will create the directory hierarchy
609            on the remote federant.
610            """
611
612            if os.path.isdir(src_dir):
613                print >>script, "mkdir -p %s" % dest_dir
614                print >>script, "chmod 770 %s" % dest_dir
615
616                for f in os.listdir(src_dir):
617                    if os.path.isdir(f):
618                        self.create_config_tree("%s/%s" % (src_dir, f), 
619                                "%s/%s" % (dest_dir, f), script)
620            else:
621                self.log.debug("[create_config_tree]: Not a directory: %s" \
622                        % src_dir)
623
624        def ship_configs(self, host, user, src_dir, dest_dir):
625            """
626            Copy federant-specific configuration files to the federant.
627            """
628            for f in os.listdir(src_dir):
629                if os.path.isdir(f):
630                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
631                            "%s/%s" % (dest_dir, f)):
632                        return False
633                else:
634                    if not self.scp_file("%s/%s" % (src_dir, f), 
635                            user, host, dest_dir):
636                        return False
637            return True
638
639        def get_state(self, user, host, tb, pid, eid):
640            # command to test experiment state
641            expinfo_exec = "/usr/testbed/bin/expinfo" 
642            # Regular expressions to parse the expinfo response
643            state_re = re.compile("State:\s+(\w+)")
644            no_exp_re = re.compile("^No\s+such\s+experiment")
645            swapping_re = re.compile("^No\s+information\s+available.")
646            state = None    # Experiment state parsed from expinfo
647            # The expinfo ssh command.  Note the identity restriction to use
648            # only the identity provided in the pubkey given.
649            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
650                    'StrictHostKeyChecking yes', '-i', 
651                    self.ssh_privkey_file, "%s@%s" % (user, host), 
652                    expinfo_exec, pid, eid]
653
654            dev_null = None
655            try:
656                dev_null = open("/dev/null", "a")
657            except IOError, e:
658                self.log.error("[get_state]: can't open /dev/null: %s" %e)
659
660            if self.debug:
661                state = 'swapped'
662                rv = 0
663            else:
664                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
665                        close_fds=True)
666                for line in status.stdout:
667                    m = state_re.match(line)
668                    if m: state = m.group(1)
669                    else:
670                        for reg, st in ((no_exp_re, "none"),
671                                (swapping_re, "swapping")):
672                            m = reg.match(line)
673                            if m: state = st
674                rv = status.wait()
675
676            # If the experiment is not present the subcommand returns a
677            # non-zero return value.  If we successfully parsed a "none"
678            # outcome, ignore the return code.
679            if rv != 0 and state != 'none':
680                raise service_error(service_error.internal,
681                        "Cannot get status of segment %s:%s/%s" % \
682                                (tb, pid, eid))
683            elif state not in ('active', 'swapped', 'swapping', 'none'):
684                raise service_error(service_error.internal,
685                        "Cannot get status of segment %s:%s/%s" % \
686                                (tb, pid, eid))
687            else: return state
688
689
690        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
691            """
692            Start a sub-experiment on a federant.
693
694            Get the current state, modify or create as appropriate, ship data
695            and configs and start the experiment.  There are small ordering
696            differences based on the initial state of the sub-experiment.
697            """
698            # ops node in the federant
699            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
700            user = tbparams[tb]['user']     # federant user
701            pid = tbparams[tb]['project']   # federant project
702            # XXX
703            base_confs = ( "hosts",)
704            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
705            # Configuration directories on the remote machine
706            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
707            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
708            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
709
710            state = self.get_state(user, host, tb, pid, eid)
711
712            self.log.debug("[start_segment]: %s: %s" % (tb, state))
713            self.log.info("[start_segment]:transferring experiment to %s" % tb)
714
715            if not self.scp_file("%s/%s/%s" % \
716                    (tmpdir, tb, tclfile), user, host):
717                return False
718           
719            if state == 'none':
720                # Create a null copy of the experiment so that we capture any
721                # logs there if the modify fails.  Emulab software discards the
722                # logs from a failed startexp
723                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
724                    return False
725                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
726                timedout = False
727                try:
728                    if not self.ssh_cmd(user, host,
729                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
730                            "-e %s null.tcl") % (pid, eid), "startexp",
731                            timeout=60 * 10):
732                        return False
733                except self.ssh_cmd_timeout:
734                    timedout = True
735
736                if timedout:
737                    state = self.get_state(user, host, tb, pid, eid)
738                    if state != "swapped":
739                        return False
740
741           
742            # Open up a temporary file to contain a script for setting up the
743            # filespace for the new experiment.
744            self.log.info("[start_segment]: creating script file")
745            try:
746                sf, scriptname = tempfile.mkstemp()
747                scriptfile = os.fdopen(sf, 'w')
748            except IOError:
749                return False
750
751            scriptbase = os.path.basename(scriptname)
752
753            # Script the filesystem changes
754            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
755            # Clear and create the tarfiles and rpm directories
756            for d in (tarfiles_dir, rpms_dir):
757                print >>scriptfile, "/bin/rm -rf %s/*" % d
758                print >>scriptfile, "mkdir -p %s" % d
759            print >>scriptfile, 'mkdir -p %s' % proj_dir
760            self.create_config_tree("%s/%s" % (tmpdir, tb),
761                    proj_dir, scriptfile)
762            if os.path.isdir("%s/tarfiles" % tmpdir):
763                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
764                        scriptfile)
765            if os.path.isdir("%s/rpms" % tmpdir):
766                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
767                        scriptfile)
768            print >>scriptfile, "rm -f %s" % scriptbase
769            scriptfile.close()
770
771            # Move the script to the remote machine
772            # XXX: could collide tempfile names on the remote host
773            if self.scp_file(scriptname, user, host, scriptbase):
774                os.remove(scriptname)
775            else:
776                return False
777
778            # Execute the script (and the script's last line deletes it)
779            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
780                return False
781
782            for f in base_confs:
783                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
784                        "%s/%s" % (proj_dir, f)):
785                    return False
786            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
787                    proj_dir):
788                return False
789            if os.path.isdir("%s/tarfiles" % tmpdir):
790                if not self.ship_configs(host, user,
791                        "%s/tarfiles" % tmpdir, tarfiles_dir):
792                    return False
793            if os.path.isdir("%s/rpms" % tmpdir):
794                if not self.ship_configs(host, user,
795                        "%s/rpms" % tmpdir, tarfiles_dir):
796                    return False
797            # Stage the new configuration (active experiments will stay swapped
798            # in now)
799            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
800            try:
801                if not self.ssh_cmd(user, host,
802                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
803                                (pid, eid, tclfile),
804                        "modexp", timeout= 60 * 10):
805                    return False
806            except self.ssh_cmd_timeout:
807                self.log.error("Modify command failed to complete in time")
808                # There's really no way to see if this succeeded or failed, so
809                # if it hangs, assume the worst.
810                return False
811            # Active experiments are still swapped, this swaps the others in.
812            if state != 'active':
813                self.log.info("[start_segment]: Swapping %s in on %s" % \
814                        (eid, tb))
815                timedout = False
816                try:
817                    if not self.ssh_cmd(user, host,
818                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
819                            "swapexp", timeout=10*60):
820                        return False
821                except self.ssh_cmd_timeout:
822                    timedout = True
823               
824                # If the command was terminated, but completed successfully,
825                # report success.
826                if timedout:
827                    self.log.debug("[start_segment]: swapin timed out " +\
828                            "checking state")
829                    state = self.get_state(user, host, tb, pid, eid)
830                    self.log.debug("[start_segment]: state is %s" % state)
831                    return state == 'active'
832            # Everything has gone OK.
833            return True
834
835    class stop_segment(emulab_segment):
836        def __init__(self, log=None, keyfile=None, debug=False):
837            experiment_control_local.emulab_segment.__init__(self,
838                    log=log, keyfile=keyfile, debug=debug)
839
840        def __call__(self, tb, eid, tbparams):
841            """
842            Stop a sub experiment by calling swapexp on the federant
843            """
844            user = tbparams[tb]['user']
845            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
846            pid = tbparams[tb]['project']
847
848            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
849            rv = False
850            try:
851                # Clean out tar files: we've gone over quota in the past
852                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
853                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
854                        (pid, eid))
855                rv = self.ssh_cmd(user, host,
856                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
857            except self.ssh_cmd_timeout:
858                rv = False
859            return rv
860
861       
862    def generate_ssh_keys(self, dest, type="rsa" ):
863        """
864        Generate a set of keys for the gateways to use to talk.
865
866        Keys are of type type and are stored in the required dest file.
867        """
868        valid_types = ("rsa", "dsa")
869        t = type.lower();
870        if t not in valid_types: raise ValueError
871        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
872
873        try:
874            trace = open("/dev/null", "w")
875        except IOError:
876            raise service_error(service_error.internal,
877                    "Cannot open /dev/null??");
878
879        # May raise CalledProcessError
880        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
881        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
882        if rv != 0:
883            raise service_error(service_error.internal, 
884                    "Cannot generate nonce ssh keys.  %s return code %d" \
885                            % (self.ssh_keygen, rv))
886
887    def gentopo(self, str):
888        """
889        Generate the topology dtat structure from the splitter's XML
890        representation of it.
891
892        The topology XML looks like:
893            <experiment>
894                <nodes>
895                    <node><vname></vname><ips>ip1:ip2</ips></node>
896                </nodes>
897                <lans>
898                    <lan>
899                        <vname></vname><vnode></vnode><ip></ip>
900                        <bandwidth></bandwidth><member>node:port</member>
901                    </lan>
902                </lans>
903        """
904        class topo_parse:
905            """
906            Parse the topology XML and create the dats structure.
907            """
908            def __init__(self):
909                # Typing of the subelements for data conversion
910                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
911                self.int_subelements = ( 'bandwidth',)
912                self.float_subelements = ( 'delay',)
913                # The final data structure
914                self.nodes = [ ]
915                self.lans =  [ ]
916                self.topo = { \
917                        'node': self.nodes,\
918                        'lan' : self.lans,\
919                    }
920                self.element = { }  # Current element being created
921                self.chars = ""     # Last text seen
922
923            def end_element(self, name):
924                # After each sub element the contents is added to the current
925                # element or to the appropriate list.
926                if name == 'node':
927                    self.nodes.append(self.element)
928                    self.element = { }
929                elif name == 'lan':
930                    self.lans.append(self.element)
931                    self.element = { }
932                elif name in self.str_subelements:
933                    self.element[name] = self.chars
934                    self.chars = ""
935                elif name in self.int_subelements:
936                    self.element[name] = int(self.chars)
937                    self.chars = ""
938                elif name in self.float_subelements:
939                    self.element[name] = float(self.chars)
940                    self.chars = ""
941
942            def found_chars(self, data):
943                self.chars += data.rstrip()
944
945
946        tp = topo_parse();
947        parser = xml.parsers.expat.ParserCreate()
948        parser.EndElementHandler = tp.end_element
949        parser.CharacterDataHandler = tp.found_chars
950
951        parser.Parse(str)
952
953        return tp.topo
954       
955
956    def genviz(self, topo):
957        """
958        Generate the visualization the virtual topology
959        """
960
961        neato = "/usr/local/bin/neato"
962        # These are used to parse neato output and to create the visualization
963        # file.
964        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
965        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
966                "%s</type></node>"
967
968        try:
969            # Node names
970            nodes = [ n['vname'] for n in topo['node'] ]
971            topo_lans = topo['lan']
972        except KeyError:
973            raise service_error(service_error.internal, "Bad topology")
974
975        lans = { }
976        links = { }
977
978        # Walk through the virtual topology, organizing the connections into
979        # 2-node connections (links) and more-than-2-node connections (lans).
980        # When a lan is created, it's added to the list of nodes (there's a
981        # node in the visualization for the lan).
982        for l in topo_lans:
983            if links.has_key(l['vname']):
984                if len(links[l['vname']]) < 2:
985                    links[l['vname']].append(l['vnode'])
986                else:
987                    nodes.append(l['vname'])
988                    lans[l['vname']] = links[l['vname']]
989                    del links[l['vname']]
990                    lans[l['vname']].append(l['vnode'])
991            elif lans.has_key(l['vname']):
992                lans[l['vname']].append(l['vnode'])
993            else:
994                links[l['vname']] = [ l['vnode'] ]
995
996
997        # Open up a temporary file for dot to turn into a visualization
998        try:
999            df, dotname = tempfile.mkstemp()
1000            dotfile = os.fdopen(df, 'w')
1001        except IOError:
1002            raise service_error(service_error.internal,
1003                    "Failed to open file in genviz")
1004
1005        try:
1006            dnull = open('/dev/null', 'w')
1007        except IOError:
1008            service_error(service_error.internal,
1009                    "Failed to open /dev/null in genviz")
1010
1011        # Generate a dot/neato input file from the links, nodes and lans
1012        try:
1013            print >>dotfile, "graph G {"
1014            for n in nodes:
1015                print >>dotfile, '\t"%s"' % n
1016            for l in links.keys():
1017                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
1018            for l in lans.keys():
1019                for n in lans[l]:
1020                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
1021            print >>dotfile, "}"
1022            dotfile.close()
1023        except TypeError:
1024            raise service_error(service_error.internal,
1025                    "Single endpoint link in vtopo")
1026        except IOError:
1027            raise service_error(service_error.internal, "Cannot write dot file")
1028
1029        # Use dot to create a visualization
1030        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1031                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
1032                close_fds=True)
1033        dnull.close()
1034
1035        # Translate dot to vis format
1036        vis_nodes = [ ]
1037        vis = { 'node': vis_nodes }
1038        for line in dot.stdout:
1039            m = vis_re.match(line)
1040            if m:
1041                vn = m.group(1)
1042                vis_node = {'name': vn, \
1043                        'x': float(m.group(2)),\
1044                        'y' : float(m.group(3)),\
1045                    }
1046                if vn in links.keys() or vn in lans.keys():
1047                    vis_node['type'] = 'lan'
1048                else:
1049                    vis_node['type'] = 'node'
1050                vis_nodes.append(vis_node)
1051        rv = dot.wait()
1052
1053        os.remove(dotname)
1054        if rv == 0 : return vis
1055        else: return None
1056
1057    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1058            access_user):
1059        """
1060        Get access to testbed through fedd and set the parameters for that tb
1061        """
1062        uri = self.tbmap.get(tb, None)
1063        if not uri:
1064            raise service_error(serice_error.server_config, 
1065                    "Unknown testbed: %s" % tb)
1066
1067        # currently this lumps all users into one service access group
1068        service_keys = [ a for u in user \
1069                for a in u.get('access', []) \
1070                    if a.has_key('sshPubkey')]
1071
1072        if len(service_keys) == 0:
1073            raise service_error(service_error.req, 
1074                    "Must have at least one SSH pubkey for services")
1075
1076
1077        for p, u in access_user:
1078            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1079                    "to %s") %  ((p or "None"), u, uri))
1080
1081            if p:
1082                # Request with user and project specified
1083                req = {\
1084                        'destinationTestbed' : { 'uri' : uri },
1085                        'project': { 
1086                            'name': {'localname': p},
1087                            'user': [ {'userID': { 'localname': u } } ],
1088                            },
1089                        'user':  user,
1090                        'allocID' : { 'localname': 'test' },
1091                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1092                        'serviceAccess' : service_keys
1093                    }
1094            else:
1095                # Request with only user specified
1096                req = {\
1097                        'destinationTestbed' : { 'uri' : uri },
1098                        'user':  [ {'userID': { 'localname': u } } ],
1099                        'allocID' : { 'localname': 'test' },
1100                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1101                        'serviceAccess' : service_keys
1102                    }
1103
1104            if tb == master:
1105                # NB, the export_project parameter is a dict that includes
1106                # the type
1107                req['exportProject'] = export_project
1108
1109            # node resources if any
1110            if nodes != None and len(nodes) > 0:
1111                rnodes = [ ]
1112                for n in nodes:
1113                    rn = { }
1114                    image, hw, count = n.split(":")
1115                    if image: rn['image'] = [ image ]
1116                    if hw: rn['hardware'] = [ hw ]
1117                    if count and int(count) >0 : rn['count'] = int(count)
1118                    rnodes.append(rn)
1119                req['resources']= { }
1120                req['resources']['node'] = rnodes
1121
1122            try:
1123                if self.local_access.has_key(uri):
1124                    # Local access call
1125                    req = { 'RequestAccessRequestBody' : req }
1126                    r = self.local_access[uri].RequestAccess(req, 
1127                            fedid(file=self.cert_file))
1128                    r = { 'RequestAccessResponseBody' : r }
1129                else:
1130                    r = self.call_RequestAccess(uri, req, 
1131                            self.cert_file, self.cert_pwd, self.trusted_certs)
1132            except service_error, e:
1133                if e.code == service_error.access:
1134                    self.log.debug("[get_access] Access denied")
1135                    r = None
1136                    continue
1137                else:
1138                    raise e
1139
1140            if r.has_key('RequestAccessResponseBody'):
1141                # Through to here we have a valid response, not a fault.
1142                # Access denied is a fault, so something better or worse than
1143                # access denied has happened.
1144                r = r['RequestAccessResponseBody']
1145                self.log.debug("[get_access] Access granted")
1146                break
1147            else:
1148                raise service_error(service_error.protocol,
1149                        "Bad proxy response")
1150       
1151        if not r:
1152            raise service_error(service_error.access, 
1153                    "Access denied by %s (%s)" % (tb, uri))
1154
1155        e = r['emulab']
1156        p = e['project']
1157        tbparam[tb] = { 
1158                "boss": e['boss'],
1159                "host": e['ops'],
1160                "domain": e['domain'],
1161                "fs": e['fileServer'],
1162                "eventserver": e['eventServer'],
1163                "project": unpack_id(p['name']),
1164                "emulab" : e,
1165                "allocID" : r['allocID'],
1166                }
1167        # Make the testbed name be the label the user applied
1168        p['testbed'] = {'localname': tb }
1169
1170        for u in p['user']:
1171            role = u.get('role', None)
1172            if role == 'experimentCreation':
1173                tbparam[tb]['user'] = unpack_id(u['userID'])
1174                break
1175        else:
1176            raise service_error(service_error.internal, 
1177                    "No createExperimentUser from %s" %tb)
1178
1179        # Add attributes to barameter space.  We don't allow attributes to
1180        # overlay any parameters already installed.
1181        for a in e['fedAttr']:
1182            try:
1183                if a['attribute'] and isinstance(a['attribute'], basestring)\
1184                        and not tbparam[tb].has_key(a['attribute'].lower()):
1185                    tbparam[tb][a['attribute'].lower()] = a['value']
1186            except KeyError:
1187                self.log.error("Bad attribute in response: %s" % a)
1188       
1189    def release_access(self, tb, aid):
1190        """
1191        Release access to testbed through fedd
1192        """
1193
1194        uri = self.tbmap.get(tb, None)
1195        if not uri:
1196            raise service_error(serice_error.server_config, 
1197                    "Unknown testbed: %s" % tb)
1198
1199        if self.local_access.has_key(uri):
1200            resp = self.local_access[uri].ReleaseAccess(\
1201                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1202                    fedid(file=self.cert_file))
1203            resp = { 'ReleaseAccessResponseBody': resp } 
1204        else:
1205            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1206                    self.cert_file, self.cert_pwd, self.trusted_certs)
1207
1208        # better error coding
1209
1210    def remote_splitter(self, uri, desc, master):
1211
1212        req = {
1213                'description' : { 'ns2description': desc },
1214                'master': master,
1215                'include_fedkit': bool(self.fedkit),
1216                'include_gatewaykit': bool(self.gatewaykit)
1217            }
1218
1219        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1220                self.trusted_certs)
1221
1222        if r.has_key('Ns2SplitResponseBody'):
1223            r = r['Ns2SplitResponseBody']
1224            if r.has_key('output'):
1225                return r['output'].splitlines()
1226            else:
1227                raise service_error(service_error.protocol, 
1228                        "Bad splitter response (no output)")
1229        else:
1230            raise service_error(service_error.protocol, "Bad splitter response")
1231       
1232    class current_testbed:
1233        """
1234        Object for collecting the current testbed description.  The testbed
1235        description is saved to a file with the local testbed variables
1236        subsittuted line by line.
1237        """
1238        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1239            def tar_list_to_string(tl):
1240                if tl is None: return None
1241
1242                rv = ""
1243                for t in tl:
1244                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1245                            (t[0], os.path.basename(t[1]))
1246                return rv
1247
1248
1249            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1250            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1251            self.current_testbed = None
1252            self.testbed_file = None
1253
1254            self.def_expstart = \
1255                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1256            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1257            self.def_gwstart = \
1258                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1259            self.def_mgwstart = \
1260                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1261            self.def_gwimage = "FBSD61-TUNNEL2";
1262            self.def_gwtype = "pc";
1263            self.def_mgwcmd = '# '
1264            self.def_mgwcmdparams = ''
1265            self.def_gwcmd = '# '
1266            self.def_gwcmdparams = ''
1267
1268            self.eid = eid
1269            self.tmpdir = tmpdir
1270            # Convert fedkit and gateway kit (which are lists of tuples) into a
1271            # substituition string.
1272            self.fedkit = tar_list_to_string(fedkit)
1273            self.gatewaykit = tar_list_to_string(gatewaykit)
1274
1275        def __call__(self, line, master, allocated, tbparams):
1276            # Capture testbed topology descriptions
1277            if self.current_testbed == None:
1278                m = self.begin_testbed.match(line)
1279                if m != None:
1280                    self.current_testbed = m.group(1)
1281                    if self.current_testbed == None:
1282                        raise service_error(service_error.req,
1283                                "Bad request format (unnamed testbed)")
1284                    allocated[self.current_testbed] = \
1285                            allocated.get(self.current_testbed,0) + 1
1286                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1287                    if not os.path.exists(tb_dir):
1288                        try:
1289                            os.mkdir(tb_dir)
1290                        except IOError:
1291                            raise service_error(service_error.internal,
1292                                    "Cannot create %s" % tb_dir)
1293                    try:
1294                        self.testbed_file = open("%s/%s.%s.tcl" %
1295                                (tb_dir, self.eid, self.current_testbed), 'w')
1296                    except IOError:
1297                        self.testbed_file = None
1298                    return True
1299                else: return False
1300            else:
1301                m = self.end_testbed.match(line)
1302                if m != None:
1303                    if m.group(1) != self.current_testbed:
1304                        raise service_error(service_error.internal, 
1305                                "Mismatched testbed markers!?")
1306                    if self.testbed_file != None: 
1307                        self.testbed_file.close()
1308                        self.testbed_file = None
1309                    self.current_testbed = None
1310                elif self.testbed_file:
1311                    # Substitute variables and put the line into the local
1312                    # testbed file.
1313                    gwtype = tbparams[self.current_testbed].get(\
1314                            'connectortype', self.def_gwtype)
1315                    gwimage = tbparams[self.current_testbed].get(\
1316                            'connectorimage', self.def_gwimage)
1317                    mgwstart = tbparams[self.current_testbed].get(\
1318                            'masterconnectorstartcmd', self.def_mgwstart)
1319                    mexpstart = tbparams[self.current_testbed].get(\
1320                            'masternodestartcmd', self.def_mexpstart)
1321                    gwstart = tbparams[self.current_testbed].get(\
1322                            'slaveconnectorstartcmd', self.def_gwstart)
1323                    expstart = tbparams[self.current_testbed].get(\
1324                            'slavenodestartcmd', self.def_expstart)
1325                    project = tbparams[self.current_testbed].get('project')
1326                    gwcmd = tbparams[self.current_testbed].get(\
1327                            'slaveconnectorcmd', self.def_gwcmd)
1328                    gwcmdparams = tbparams[self.current_testbed].get(\
1329                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1330                    mgwcmd = tbparams[self.current_testbed].get(\
1331                            'masterconnectorcmd', self.def_gwcmd)
1332                    mgwcmdparams = tbparams[self.current_testbed].get(\
1333                            'masterconnectorcmdparams', self.def_gwcmdparams)
1334                    line = re.sub("GWTYPE", gwtype, line)
1335                    line = re.sub("GWIMAGE", gwimage, line)
1336                    if self.current_testbed == master:
1337                        line = re.sub("GWSTART", mgwstart, line)
1338                        line = re.sub("EXPSTART", mexpstart, line)
1339                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1340                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1341                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1342                    else:
1343                        line = re.sub("GWSTART", gwstart, line)
1344                        line = re.sub("EXPSTART", expstart, line)
1345                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1346                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1347                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1348                    #These expansions contain EID and PROJDIR.  NB these are
1349                    # local fedkit and gatewaykit, which are strings.
1350                    if self.fedkit:
1351                        line = re.sub("FEDKIT", self.fedkit, line)
1352                    if self.gatewaykit:
1353                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1354                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1355                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1356                    line = re.sub("EID", self.eid, line)
1357                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1358                            (project, self.eid), line)
1359                    print >>self.testbed_file, line
1360                return True
1361
1362    class allbeds:
1363        """
1364        Process the Allbeds section.  Get access to each federant and save the
1365        parameters in tbparams
1366        """
1367        def __init__(self, get_access):
1368            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1369            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1370            self.in_allbeds = False
1371            self.get_access = get_access
1372
1373        def __call__(self, line, user, tbparams, master, export_project,
1374                access_user):
1375            # Testbed access parameters
1376            if not self.in_allbeds:
1377                if self.begin_allbeds.match(line):
1378                    self.in_allbeds = True
1379                    return True
1380                else:
1381                    return False
1382            else:
1383                if self.end_allbeds.match(line):
1384                    self.in_allbeds = False
1385                else:
1386                    nodes = line.split('|')
1387                    tb = nodes.pop(0)
1388                    self.get_access(tb, nodes, user, tbparams, master,
1389                            export_project, access_user)
1390                return True
1391
1392    class gateways:
1393        def __init__(self, eid, master, tmpdir, gw_pubkey,
1394                gw_secretkey, copy_file, fedkit):
1395            self.begin_gateways = \
1396                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1397            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1398            self.current_gateways = None
1399            self.control_gateway = None
1400            self.active_end = { }
1401
1402            self.eid = eid
1403            self.master = master
1404            self.tmpdir = tmpdir
1405            self.gw_pubkey_base = gw_pubkey
1406            self.gw_secretkey_base = gw_secretkey
1407
1408            self.copy_file = copy_file
1409            self.fedkit = fedkit
1410
1411
1412        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1413                active_end, tbparams, dtb, myname, desthost, type):
1414            """
1415            Produce a gateway configuration file from a gateways line.
1416            """
1417
1418            sproject = tbparams[gw].get('project', 'project')
1419            dproject = tbparams[dtb].get('project', 'project')
1420            sdomain = ".%s.%s%s" % (eid, sproject,
1421                    tbparams[gw].get('domain', ".example.com"))
1422            ddomain = ".%s.%s%s" % (eid, dproject,
1423                    tbparams[dtb].get('domain', ".example.com"))
1424            boss = tbparams[master].get('boss', "boss")
1425            fs = tbparams[master].get('fs', "fs")
1426            event_server = "%s%s" % \
1427                    (tbparams[gw].get('eventserver', "event_server"),
1428                            tbparams[gw].get('domain', "example.com"))
1429            remote_event_server = "%s%s" % \
1430                    (tbparams[dtb].get('eventserver', "event_server"),
1431                            tbparams[dtb].get('domain', "example.com"))
1432            seer_control = "%s%s" % \
1433                    (tbparams[gw].get('control', "control"), sdomain)
1434            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1435
1436            if self.fedkit:
1437                remote_script_dir = "/usr/local/federation/bin"
1438                local_script_dir = "/usr/local/federation/bin"
1439            else:
1440                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1441                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1442
1443            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1444            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1445            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1446
1447            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1448            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1449
1450            # translate to lower case so the `hostname` hack for specifying
1451            # configuration files works.
1452            conf_file = conf_file.lower();
1453            remote_conf_file = remote_conf_file.lower();
1454
1455            if dtb == master:
1456                active = "false"
1457            elif gw == master:
1458                active = "true"
1459            elif active_end.has_key('%s-%s' % (dtb, gw)):
1460                active = "false"
1461            else:
1462                active_end['%s-%s' % (gw, dtb)] = 1
1463                active = "true"
1464
1465            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1466            print >>gwconfig, "Active: %s" % active
1467            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1468            if tunnel_iface:
1469                print >>gwconfig, "Interface: %s" % tunnel_iface
1470            print >>gwconfig, "BossName: %s" % boss
1471            print >>gwconfig, "FsName: %s" % fs
1472            print >>gwconfig, "EventServerName: %s" % event_server
1473            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1474            print >>gwconfig, "SeerControl: %s" % seer_control
1475            print >>gwconfig, "Type: %s" % type
1476            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1477            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1478                    local_script_dir
1479            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1480            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1481            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1482                    (remote_conf_dir, remote_conf_file)
1483            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1484            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1485            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1486            gwconfig.close()
1487
1488            return active == "true"
1489
1490        def __call__(self, line, allocated, tbparams):
1491            # Process gateways
1492            if not self.current_gateways:
1493                m = self.begin_gateways.match(line)
1494                if m:
1495                    self.current_gateways = m.group(1)
1496                    if allocated.has_key(self.current_gateways):
1497                        # This test should always succeed
1498                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1499                        if not os.path.exists(tb_dir):
1500                            try:
1501                                os.mkdir(tb_dir)
1502                            except IOError:
1503                                raise service_error(service_error.internal,
1504                                        "Cannot create %s" % tb_dir)
1505                    else:
1506                        # XXX
1507                        self.log.error("[gateways]: Ignoring gateways for " + \
1508                                "unknown testbed %s" % self.current_gateways)
1509                        self.current_gateways = None
1510                    return True
1511                else:
1512                    return False
1513            else:
1514                m = self.end_gateways.match(line)
1515                if m :
1516                    if m.group(1) != self.current_gateways:
1517                        raise service_error(service_error.internal,
1518                                "Mismatched gateway markers!?")
1519                    if self.control_gateway:
1520                        try:
1521                            cc = open("%s/%s/client.conf" %
1522                                    (self.tmpdir, self.current_gateways), 'w')
1523                            print >>cc, "ControlGateway: %s" % \
1524                                    self.control_gateway
1525                            if tbparams[self.master].has_key('smbshare'):
1526                                print >>cc, "SMBSHare: %s" % \
1527                                        tbparams[self.master]['smbshare']
1528                            print >>cc, "ProjectUser: %s" % \
1529                                    tbparams[self.master]['user']
1530                            print >>cc, "ProjectName: %s" % \
1531                                    tbparams[self.master]['project']
1532                            print >>cc, "ExperimentID: %s/%s" % \
1533                                    ( tbparams[self.master]['project'], \
1534                                    self.eid )
1535                            cc.close()
1536                        except IOError:
1537                            raise service_error(service_error.internal,
1538                                    "Error creating client config")
1539                        # XXX: This seer specific file should disappear
1540                        try:
1541                            cc = open("%s/%s/seer.conf" %
1542                                    (self.tmpdir, self.current_gateways),
1543                                    'w')
1544                            if self.current_gateways != self.master:
1545                                print >>cc, "ControlNode: %s" % \
1546                                        self.control_gateway
1547                            print >>cc, "ExperimentID: %s/%s" % \
1548                                    ( tbparams[self.master]['project'], \
1549                                    self.eid )
1550                            cc.close()
1551                        except IOError:
1552                            raise service_error(service_error.internal,
1553                                    "Error creating seer config")
1554                    else:
1555                        debug.error("[gateways]: No control gateway for %s" %\
1556                                    self.current_gateways)
1557                    self.current_gateways = None
1558                else:
1559                    dtb, myname, desthost, type = line.split(" ")
1560
1561                    if type == "control" or type == "both":
1562                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1563                                self.eid, 
1564                                tbparams[self.current_gateways]['project'],
1565                                tbparams[self.current_gateways]['domain'])
1566                    try:
1567                        active = self.gateway_conf_file(self.current_gateways,
1568                                self.master, self.eid, self.gw_pubkey_base,
1569                                self.gw_secretkey_base,
1570                                self.active_end, tbparams, dtb, myname,
1571                                desthost, type)
1572                    except IOError, e:
1573                        raise service_error(service_error.internal,
1574                                "Failed to write config file for %s" % \
1575                                        self.current_gateway)
1576           
1577                    gw_pubkey = "%s/keys/%s" % \
1578                            (self.tmpdir, self.gw_pubkey_base)
1579                    gw_secretkey = "%s/keys/%s" % \
1580                            (self.tmpdir, self.gw_secretkey_base)
1581
1582                    pkfile = "%s/%s/%s" % \
1583                            ( self.tmpdir, self.current_gateways, 
1584                                    self.gw_pubkey_base)
1585                    skfile = "%s/%s/%s" % \
1586                            ( self.tmpdir, self.current_gateways, 
1587                                    self.gw_secretkey_base)
1588
1589                    if not os.path.exists(pkfile):
1590                        try:
1591                            self.copy_file(gw_pubkey, pkfile)
1592                        except IOError:
1593                            service_error(service_error.internal,
1594                                    "Failed to copy pubkey file")
1595
1596                    if active and not os.path.exists(skfile):
1597                        try:
1598                            self.copy_file(gw_secretkey, skfile)
1599                        except IOError:
1600                            service_error(service_error.internal,
1601                                    "Failed to copy secretkey file")
1602                return True
1603
1604    class shunt_to_file:
1605        """
1606        Simple class to write data between two regexps to a file.
1607        """
1608        def __init__(self, begin, end, filename):
1609            """
1610            Begin shunting on a match of begin, stop on end, send data to
1611            filename.
1612            """
1613            self.begin = re.compile(begin)
1614            self.end = re.compile(end)
1615            self.in_shunt = False
1616            self.file = None
1617            self.filename = filename
1618
1619        def __call__(self, line):
1620            """
1621            Call this on each line in the input that may be shunted.
1622            """
1623            if not self.in_shunt:
1624                if self.begin.match(line):
1625                    self.in_shunt = True
1626                    try:
1627                        self.file = open(self.filename, "w")
1628                    except:
1629                        self.file = None
1630                        raise
1631                    return True
1632                else:
1633                    return False
1634            else:
1635                if self.end.match(line):
1636                    if self.file: 
1637                        self.file.close()
1638                        self.file = None
1639                    self.in_shunt = False
1640                else:
1641                    if self.file:
1642                        print >>self.file, line
1643                return True
1644
1645    class shunt_to_list:
1646        """
1647        Same interface as shunt_to_file.  Data collected in self.list, one list
1648        element per line.
1649        """
1650        def __init__(self, begin, end):
1651            self.begin = re.compile(begin)
1652            self.end = re.compile(end)
1653            self.in_shunt = False
1654            self.list = [ ]
1655       
1656        def __call__(self, line):
1657            if not self.in_shunt:
1658                if self.begin.match(line):
1659                    self.in_shunt = True
1660                    return True
1661                else:
1662                    return False
1663            else:
1664                if self.end.match(line):
1665                    self.in_shunt = False
1666                else:
1667                    self.list.append(line)
1668                return True
1669
1670    class shunt_to_string:
1671        """
1672        Same interface as shunt_to_file.  Data collected in self.str, all in
1673        one string.
1674        """
1675        def __init__(self, begin, end):
1676            self.begin = re.compile(begin)
1677            self.end = re.compile(end)
1678            self.in_shunt = False
1679            self.str = ""
1680       
1681        def __call__(self, line):
1682            if not self.in_shunt:
1683                if self.begin.match(line):
1684                    self.in_shunt = True
1685                    return True
1686                else:
1687                    return False
1688            else:
1689                if self.end.match(line):
1690                    self.in_shunt = False
1691                else:
1692                    self.str += line
1693                return True
1694
1695    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1696            tbparams, tmpdir, alloc_log=None):
1697        started = { }           # Testbeds where a sub-experiment started
1698                                # successfully
1699
1700        # XXX
1701        fail_soft = False
1702
1703        log = alloc_log or self.log
1704
1705        thread_pool = self.thread_pool(self.nthreads)
1706        threads = [ ]
1707
1708        for tb in [ k for k in allocated.keys() if k != master]:
1709            # Create and start a thread to start the segment, and save it to
1710            # get the return value later
1711            thread_pool.wait_for_slot()
1712            t  = self.pooled_thread(\
1713                    target=self.start_segment(log=log,
1714                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1715                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1716                    pdata=thread_pool, trace_file=self.trace_file)
1717            threads.append(t)
1718            t.start()
1719
1720        # Wait until all finish
1721        thread_pool.wait_for_all_done()
1722
1723        # If none failed, start the master
1724        failed = [ t.getName() for t in threads if not t.rv ]
1725
1726        if len(failed) == 0:
1727            starter = self.start_segment(log=log, 
1728                    keyfile=self.ssh_privkey_file, debug=self.debug)
1729            if not starter(master, eid, tbparams, tmpdir):
1730                failed.append(master)
1731
1732        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1733        # If one failed clean up, unless fail_soft is set
1734        if failed:
1735            if not fail_soft:
1736                thread_pool.clear()
1737                for tb in succeeded:
1738                    # Create and start a thread to stop the segment
1739                    thread_pool.wait_for_slot()
1740                    t  = self.pooled_thread(\
1741                            target=self.stop_segment(log=log,
1742                                keyfile=self.ssh_privkey_file,
1743                                debug=self.debug), 
1744                            args=(tb, eid, tbparams), name=tb,
1745                            pdata=thread_pool, trace_file=self.trace_file)
1746                    t.start()
1747                # Wait until all finish
1748                thread_pool.wait_for_all_done()
1749
1750                # release the allocations
1751                for tb in tbparams.keys():
1752                    self.release_access(tb, tbparams[tb]['allocID'])
1753                # Remove the placeholder
1754                self.state_lock.acquire()
1755                self.state[eid]['experimentStatus'] = 'failed'
1756                if self.state_filename: self.write_state()
1757                self.state_lock.release()
1758
1759                #raise service_error(service_error.federant,
1760                #    "Swap in failed on %s" % ",".join(failed))
1761                log.error("Swap in failed on %s" % ",".join(failed))
1762                return
1763        else:
1764            log.info("[start_segment]: Experiment %s active" % eid)
1765
1766        log.debug("[start_experiment]: removing %s" % tmpdir)
1767
1768        # Walk up tmpdir, deleting as we go
1769        for path, dirs, files in os.walk(tmpdir, topdown=False):
1770            for f in files:
1771                os.remove(os.path.join(path, f))
1772            for d in dirs:
1773                os.rmdir(os.path.join(path, d))
1774        os.rmdir(tmpdir)
1775
1776        # Insert the experiment into our state and update the disk copy
1777        self.state_lock.acquire()
1778        self.state[expid]['experimentStatus'] = 'active'
1779        self.state[eid] = self.state[expid]
1780        if self.state_filename: self.write_state()
1781        self.state_lock.release()
1782        return
1783
1784    def create_experiment(self, req, fid):
1785        """
1786        The external interface to experiment creation called from the
1787        dispatcher.
1788
1789        Creates a working directory, splits the incoming description using the
1790        splitter script and parses out the avrious subsections using the
1791        lcasses above.  Once each sub-experiment is created, use pooled threads
1792        to instantiate them and start it all up.
1793        """
1794
1795        if not self.auth.check_attribute(fid, 'create'):
1796            raise service_error(service_error.access, "Create access denied")
1797
1798        try:
1799            tmpdir = tempfile.mkdtemp(prefix="split-")
1800        except IOError:
1801            raise service_error(service_error.internal, "Cannot create tmp dir")
1802
1803        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1804        gw_secretkey_base = "fed.%s" % self.ssh_type
1805        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1806        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1807        tclfile = tmpdir + "/experiment.tcl"
1808        tbparams = { }
1809        try:
1810            access_user = self.accessdb[fid]
1811        except KeyError:
1812            raise service_error(service_error.internal,
1813                    "Access map and authorizer out of sync in " + \
1814                            "create_experiment for fedid %s"  % fid)
1815
1816        pid = "dummy"
1817        gid = "dummy"
1818        try:
1819            os.mkdir(tmpdir+"/keys")
1820        except OSError:
1821            raise service_error(service_error.internal,
1822                    "Can't make temporary dir")
1823
1824        req = req.get('CreateRequestBody', None)
1825        if not req:
1826            raise service_error(service_error.req,
1827                    "Bad request format (no CreateRequestBody)")
1828        # The tcl parser needs to read a file so put the content into that file
1829        descr=req.get('experimentdescription', None)
1830        if descr:
1831            file_content=descr.get('ns2description', None)
1832            if file_content:
1833                try:
1834                    f = open(tclfile, 'w')
1835                    f.write(file_content)
1836                    f.close()
1837                except IOError:
1838                    raise service_error(service_error.internal,
1839                            "Cannot write temp experiment description")
1840            else:
1841                raise service_error(service_error.req, 
1842                        "Only ns2descriptions supported")
1843        else:
1844            raise service_error(service_error.req, "No experiment description")
1845
1846        # Generate an ID for the experiment (slice) and a certificate that the
1847        # allocator can use to prove they own it.  We'll ship it back through
1848        # the encrypted connection.
1849        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1850
1851        if req.has_key('experimentID') and \
1852                req['experimentID'].has_key('localname'):
1853            overwrite = False
1854            eid = req['experimentID']['localname']
1855            # If there's an old failed experiment here with the same local name
1856            # and accessible by this user, we'll overwrite it, otherwise we'll
1857            # fall through and do the collision avoidance.
1858            old_expid = self.get_experiment_fedid(eid)
1859            if old_expid and self.check_experiment_access(fid, old_expid):
1860                self.state_lock.acquire()
1861                status = self.state[eid].get('experimentStatus', None)
1862                if status and status == 'failed':
1863                    # remove the old access attribute
1864                    self.auth.unset_attribute(fid, old_expid)
1865                    overwrite = True
1866                    del self.state[eid]
1867                    del self.state[old_expid]
1868                self.state_lock.release()
1869            self.state_lock.acquire()
1870            while (self.state.has_key(eid) and not overwrite):
1871                eid += random.choice(string.ascii_letters)
1872            # Initial state
1873            self.state[eid] = {
1874                    'experimentID' : \
1875                            [ { 'localname' : eid }, {'fedid': expid } ],
1876                    'experimentStatus': 'starting',
1877                    'experimentAccess': { 'X509' : expcert },
1878                    'owner': fid,
1879                    'log' : [],
1880                }
1881            self.state[expid] = self.state[eid]
1882            if self.state_filename: self.write_state()
1883            self.state_lock.release()
1884        else:
1885            eid = self.exp_stem
1886            for i in range(0,5):
1887                eid += random.choice(string.ascii_letters)
1888            self.state_lock.acquire()
1889            while (self.state.has_key(eid)):
1890                eid = self.exp_stem
1891                for i in range(0,5):
1892                    eid += random.choice(string.ascii_letters)
1893            # Initial state
1894            self.state[eid] = {
1895                    'experimentID' : \
1896                            [ { 'localname' : eid }, {'fedid': expid } ],
1897                    'experimentStatus': 'starting',
1898                    'experimentAccess': { 'X509' : expcert },
1899                    'owner': fid,
1900                    'log' : [],
1901                }
1902            self.state[expid] = self.state[eid]
1903            if self.state_filename: self.write_state()
1904            self.state_lock.release()
1905
1906        try: 
1907            # This catches exceptions to clear the placeholder if necessary
1908            try:
1909                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1910            except ValueError:
1911                raise service_error(service_error.server_config, 
1912                        "Bad key type (%s)" % self.ssh_type)
1913
1914            user = req.get('user', None)
1915            if user == None:
1916                raise service_error(service_error.req, "No user")
1917
1918            master = req.get('master', None)
1919            if not master:
1920                raise service_error(service_error.req,
1921                        "No master testbed label")
1922            export_project = req.get('exportProject', None)
1923            if not export_project:
1924                raise service_error(service_error.req, "No export project")
1925           
1926            if self.splitter_url:
1927                self.log.debug("Calling remote splitter at %s" % \
1928                        self.splitter_url)
1929                split_data = self.remote_splitter(self.splitter_url,
1930                        file_content, master)
1931            else:
1932                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1933                    str(self.muxmax), '-m', master]
1934
1935                if self.fedkit:
1936                    tclcmd.append('-k')
1937
1938                if self.gatewaykit:
1939                    tclcmd.append('-K')
1940
1941                tclcmd.extend([pid, gid, eid, tclfile])
1942
1943                self.log.debug("running local splitter %s", " ".join(tclcmd))
1944                # This is just fantastic.  As a side effect the parser copies
1945                # tb_compat.tcl into the current directory, so that directory
1946                # must be writable by the fedd user.  Doing this in the
1947                # temporary subdir ensures this is the case.
1948                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1949                        cwd=tmpdir)
1950                split_data = tclparser.stdout
1951
1952            allocated = { }         # Testbeds we can access
1953            # Objects to parse the splitter output (defined above)
1954            parse_current_testbed = self.current_testbed(eid, tmpdir,
1955                    self.fedkit, self.gatewaykit)
1956            parse_allbeds = self.allbeds(self.get_access)
1957            parse_gateways = self.gateways(eid, master, tmpdir,
1958                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1959                    self.fedkit)
1960            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1961                        "^#\s+End\s+Vtopo")
1962            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1963                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1964            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1965                    "^#\s+End\s+tarfiles")
1966            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1967                    "^#\s+End\s+rpms")
1968
1969            # Working on the split data
1970            for line in split_data:
1971                line = line.rstrip()
1972                if parse_current_testbed(line, master, allocated, tbparams):
1973                    continue
1974                elif parse_allbeds(line, user, tbparams, master, export_project,
1975                        access_user):
1976                    continue
1977                elif parse_gateways(line, allocated, tbparams):
1978                    continue
1979                elif parse_vtopo(line):
1980                    continue
1981                elif parse_hostnames(line):
1982                    continue
1983                elif parse_tarfiles(line):
1984                    continue
1985                elif parse_rpms(line):
1986                    continue
1987                else:
1988                    raise service_error(service_error.internal, 
1989                            "Bad tcl parse? %s" % line)
1990            # Virtual topology and visualization
1991            vtopo = self.gentopo(parse_vtopo.str)
1992            if not vtopo:
1993                raise service_error(service_error.internal, 
1994                        "Failed to generate virtual topology")
1995
1996            vis = self.genviz(vtopo)
1997            if not vis:
1998                raise service_error(service_error.internal, 
1999                        "Failed to generate visualization")
2000
2001           
2002            # save federant information
2003            for k in allocated.keys():
2004                tbparams[k]['federant'] = {\
2005                        'name': [ { 'localname' : eid} ],\
2006                        'emulab': tbparams[k]['emulab'],\
2007                        'allocID' : tbparams[k]['allocID'],\
2008                        'master' : k == master,\
2009                    }
2010
2011            self.state_lock.acquire()
2012            self.state[eid]['vtopo'] = vtopo
2013            self.state[eid]['vis'] = vis
2014            self.state[expid]['federant'] = \
2015                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2016                        if tbparams[tb].has_key('federant') ]
2017            if self.state_filename: self.write_state()
2018            self.state_lock.release()
2019
2020            # Copy tarfiles and rpms needed at remote sites into a staging area
2021            try:
2022                if self.fedkit:
2023                    for t in self.fedkit:
2024                        parse_tarfiles.list.append(t[1])
2025                if self.gatewaykit:
2026                    for t in self.gatewaykit:
2027                        parse_tarfiles.list.append(t[1])
2028                for t in parse_tarfiles.list:
2029                    if not os.path.exists("%s/tarfiles" % tmpdir):
2030                        os.mkdir("%s/tarfiles" % tmpdir)
2031                    self.copy_file(t, "%s/tarfiles/%s" % \
2032                            (tmpdir, os.path.basename(t)))
2033                for r in parse_rpms.list:
2034                    if not os.path.exists("%s/rpms" % tmpdir):
2035                        os.mkdir("%s/rpms" % tmpdir)
2036                    self.copy_file(r, "%s/rpms/%s" % \
2037                            (tmpdir, os.path.basename(r)))
2038                # A null experiment file in case we need to create a remote
2039                # experiment from scratch
2040                f = open("%s/null.tcl" % tmpdir, "w")
2041                print >>f, """
2042set ns [new Simulator]
2043source tb_compat.tcl
2044
2045set a [$ns node]
2046
2047$ns rtproto Session
2048$ns run
2049"""
2050                f.close()
2051
2052            except IOError, e:
2053                raise service_error(service_error.internal, 
2054                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2055
2056        except service_error, e:
2057            # If something goes wrong in the parse (usually an access error)
2058            # clear the placeholder state.  From here on out the code delays
2059            # exceptions.  Failing at this point returns a fault to the remote
2060            # caller.
2061            self.state_lock.acquire()
2062            del self.state[eid]
2063            del self.state[expid]
2064            if self.state_filename: self.write_state()
2065            self.state_lock.release()
2066            raise e
2067
2068
2069        # Start the background swapper and return the starting state.  From
2070        # here on out, the state will stick around a while.
2071
2072        # Let users touch the state
2073        self.auth.set_attribute(fid, expid)
2074        self.auth.set_attribute(expid, expid)
2075        # Override fedids can manipulate state as well
2076        for o in self.overrides:
2077            self.auth.set_attribute(o, expid)
2078
2079        # Create a logger that logs to the experiment's state object as well as
2080        # to the main log file.
2081        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2082        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2083        # XXX: there should be a global one of these rather than repeating the
2084        # code.
2085        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2086                    '%d %b %y %H:%M:%S'))
2087        alloc_log.addHandler(h)
2088       
2089        # Start a thread to do the resource allocation
2090        t  = Thread(target=self.allocate_resources,
2091                args=(allocated, master, eid, expid, expcert, tbparams, 
2092                    tmpdir, alloc_log),
2093                name=eid)
2094        t.start()
2095
2096        rv = {
2097                'experimentID': [
2098                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2099                ],
2100                'experimentStatus': 'starting',
2101                'experimentAccess': { 'X509' : expcert }
2102            }
2103
2104        return rv
2105   
2106
2107    def new_create_experiment(self, req, fid):
2108        """
2109        The external interface to experiment creation called from the
2110        dispatcher.
2111
2112        Creates a working directory, splits the incoming description using the
2113        splitter script and parses out the avrious subsections using the
2114        lcasses above.  Once each sub-experiment is created, use pooled threads
2115        to instantiate them and start it all up.
2116        """
2117
2118        if not self.auth.check_attribute(fid, 'create'):
2119            raise service_error(service_error.access, "Create access denied")
2120
2121        try:
2122            tmpdir = tempfile.mkdtemp(prefix="split-")
2123        except IOError:
2124            raise service_error(service_error.internal, "Cannot create tmp dir")
2125
2126        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2127        gw_secretkey_base = "fed.%s" % self.ssh_type
2128        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2129        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2130        tclfile = tmpdir + "/experiment.tcl"
2131        tbparams = { }
2132        try:
2133            access_user = self.accessdb[fid]
2134        except KeyError:
2135            raise service_error(service_error.internal,
2136                    "Access map and authorizer out of sync in " + \
2137                            "create_experiment for fedid %s"  % fid)
2138
2139        pid = "dummy"
2140        gid = "dummy"
2141        try:
2142            os.mkdir(tmpdir+"/keys")
2143        except OSError:
2144            raise service_error(service_error.internal,
2145                    "Can't make temporary dir")
2146
2147        req = req.get('CreateRequestBody', None)
2148        if not req:
2149            raise service_error(service_error.req,
2150                    "Bad request format (no CreateRequestBody)")
2151        # The tcl parser needs to read a file so put the content into that file
2152        descr=req.get('experimentdescription', None)
2153        if descr:
2154            file_content=descr.get('ns2description', None)
2155            if file_content:
2156                try:
2157                    f = open(tclfile, 'w')
2158                    f.write(file_content)
2159                    f.close()
2160                except IOError:
2161                    raise service_error(service_error.internal,
2162                            "Cannot write temp experiment description")
2163            else:
2164                raise service_error(service_error.req, 
2165                        "Only ns2descriptions supported")
2166        else:
2167            raise service_error(service_error.req, "No experiment description")
2168
2169        # Generate an ID for the experiment (slice) and a certificate that the
2170        # allocator can use to prove they own it.  We'll ship it back through
2171        # the encrypted connection.
2172        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
2173
2174        if req.has_key('experimentID') and \
2175                req['experimentID'].has_key('localname'):
2176            overwrite = False
2177            eid = req['experimentID']['localname']
2178            # If there's an old failed experiment here with the same local name
2179            # and accessible by this user, we'll overwrite it, otherwise we'll
2180            # fall through and do the collision avoidance.
2181            old_expid = self.get_experiment_fedid(eid)
2182            if old_expid and self.check_experiment_access(fid, old_expid):
2183                self.state_lock.acquire()
2184                status = self.state[eid].get('experimentStatus', None)
2185                if status and status == 'failed':
2186                    # remove the old access attribute
2187                    self.auth.unset_attribute(fid, old_expid)
2188                    overwrite = True
2189                    del self.state[eid]
2190                    del self.state[old_expid]
2191                self.state_lock.release()
2192            self.state_lock.acquire()
2193            while (self.state.has_key(eid) and not overwrite):
2194                eid += random.choice(string.ascii_letters)
2195            # Initial state
2196            self.state[eid] = {
2197                    'experimentID' : \
2198                            [ { 'localname' : eid }, {'fedid': expid } ],
2199                    'experimentStatus': 'starting',
2200                    'experimentAccess': { 'X509' : expcert },
2201                    'owner': fid,
2202                    'log' : [],
2203                }
2204            self.state[expid] = self.state[eid]
2205            if self.state_filename: self.write_state()
2206            self.state_lock.release()
2207        else:
2208            eid = self.exp_stem
2209            for i in range(0,5):
2210                eid += random.choice(string.ascii_letters)
2211            self.state_lock.acquire()
2212            while (self.state.has_key(eid)):
2213                eid = self.exp_stem
2214                for i in range(0,5):
2215                    eid += random.choice(string.ascii_letters)
2216            # Initial state
2217            self.state[eid] = {
2218                    'experimentID' : \
2219                            [ { 'localname' : eid }, {'fedid': expid } ],
2220                    'experimentStatus': 'starting',
2221                    'experimentAccess': { 'X509' : expcert },
2222                    'owner': fid,
2223                    'log' : [],
2224                }
2225            self.state[expid] = self.state[eid]
2226            if self.state_filename: self.write_state()
2227            self.state_lock.release()
2228
2229        try: 
2230            # This catches exceptions to clear the placeholder if necessary
2231            try:
2232                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2233            except ValueError:
2234                raise service_error(service_error.server_config, 
2235                        "Bad key type (%s)" % self.ssh_type)
2236
2237            user = req.get('user', None)
2238            if user == None:
2239                raise service_error(service_error.req, "No user")
2240
2241            master = req.get('master', None)
2242            if not master:
2243                raise service_error(service_error.req,
2244                        "No master testbed label")
2245            export_project = req.get('exportProject', None)
2246            if not export_project:
2247                raise service_error(service_error.req, "No export project")
2248           
2249            if self.splitter_url:
2250                self.log.debug("Calling remote splitter at %s" % \
2251                        self.splitter_url)
2252                split_data = self.remote_splitter(self.splitter_url,
2253                        file_content, master)
2254            else:
2255                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2256                    str(self.muxmax), '-m', master]
2257
2258                if self.fedkit:
2259                    tclcmd.append('-k')
2260
2261                if self.gatewaykit:
2262                    tclcmd.append('-K')
2263
2264                tclcmd.extend([pid, gid, eid, tclfile])
2265
2266                self.log.debug("running local splitter %s", " ".join(tclcmd))
2267                # This is just fantastic.  As a side effect the parser copies
2268                # tb_compat.tcl into the current directory, so that directory
2269                # must be writable by the fedd user.  Doing this in the
2270                # temporary subdir ensures this is the case.
2271                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2272                        cwd=tmpdir)
2273                split_data = tclparser.stdout
2274
2275            allocated = { }         # Testbeds we can access
2276# XXX here's where we're working
2277            def out_topo(filename, t):
2278                try:
2279                    f = open("/tmp/%s" % filename, "w")
2280                    print >> f, "%s" % \
2281                            topdl.topology_to_xml(t, top="experiment")
2282                    f.close()
2283                except IOError, e:
2284                    raise service_error(service_error.internal, "Can't open file")
2285
2286            try:
2287
2288                top = topdl.topology_from_xml(file=split_data, top="experiment")
2289                subs = sorted(top.substrates, 
2290                        cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
2291                        reverse=True)
2292                ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
2293                for s in subs:
2294                    a = ips.allocate(len(s.interfaces)+2)
2295                    if a :
2296                        base, num = a
2297                        if num < len(s.interfaces) +2 : 
2298                            raise service_error(service_error.internal,
2299                                    "Allocator returned wrong number of IPs??")
2300                    else:
2301                        raise service_error(service_error.req, 
2302                                "Cannot allocate IP addresses")
2303
2304                    base += 1
2305                    for i in s.interfaces:
2306                        i.attribute.append(
2307                                topdl.Attribute('ip4_address', 
2308                                    "%s" % ip_addr(base)))
2309                        base += 1
2310
2311                testbeds = set([ a.value for e in top.elements \
2312                        for a in e.attribute \
2313                            if a.attribute == 'testbed'] )
2314                topo ={ }
2315                for tb in testbeds:
2316                    self.get_access(tb, None, user, tbparams, master,
2317                            export_project, access_user)
2318                    topo[tb] = top.clone()
2319                    to_delete = [ ]
2320                    for e in topo[tb].elements:
2321                        etb = e.get_attribute('testbed')
2322                        if etb and etb != tb:
2323                            for i in e.interface:
2324                                for s in i.subs:
2325                                    try:
2326                                        s.interfaces.remove(i)
2327                                    except ValueError:
2328                                        raise service_error(service_error.internal,
2329                                                "Can't remove interface??")
2330                            to_delete.append(e)
2331                    for e in to_delete:
2332                        topo[tb].elements.remove(e)
2333                    topo[tb].make_indices()
2334
2335
2336
2337                for s in top.substrates:
2338                    tests = { }
2339                    for i in s.interfaces:
2340                        e = i.element
2341                        tb = e.get_attribute('testbed')
2342                        if tb and not tests.has_key(tb):
2343                            for i in e.interface:
2344                                if s in i.subs:
2345                                    tests[tb]= \
2346                                            i.get_attribute('ip4_address')
2347                    if len(tests) < 2:
2348                        continue
2349
2350                    # More than one testbed is on this substrate.  Insert
2351                    # some gateways into the subtopologies.
2352
2353                    for st in tests.keys():
2354                        for dt in [ t for t in tests.keys() if t != st]:
2355                            myname =  "%stunnel" % dt
2356                            desthost  =  "%stunnel" % st
2357                            sproject = tbparams[st].get('project', 'project')
2358                            dproject = tbparams[dt].get('project', 'project')
2359                            sdomain = ".%s.%s%s" % (eid, sproject,
2360                                    tbparams[st].get('domain', ".example.com"))
2361                            ddomain = ".%s.%s%s" % (eid, dproject,
2362                                    tbparams[dt].get('domain', ".example.com"))
2363                            boss = tbparams[master].get('boss', "boss")
2364                            fs = tbparams[master].get('fs', "fs")
2365                            event_server = "%s%s" % \
2366                                    (tbparams[st].get('eventserver', "event_server"),
2367                                            tbparams[dt].get('domain', "example.com"))
2368                            remote_event_server = "%s%s" % \
2369                                    (tbparams[dt].get('eventserver', "event_server"),
2370                                            tbparams[dt].get('domain', "example.com"))
2371                            seer_control = "%s%s" % \
2372                                    (tbparams[st].get('control', "control"), sdomain)
2373                            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
2374                            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
2375                            conf_file = "%s%s.gw.conf" % (myname, sdomain)
2376                            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
2377                            # translate to lower case so the `hostname` hack for specifying
2378                            # configuration files works.
2379                            conf_file = conf_file.lower();
2380                            remote_conf_file = remote_conf_file.lower();
2381                            active = ("%s" % (st == master))
2382                            portal = topdl.Computer(**{
2383                                    'name': "%stunnel" % dt,
2384                                    'attribute' : [{
2385                                        'attribute': n,
2386                                        'value': v,
2387                                        } for n, v in (\
2388                                                ('gateway', 'true'),
2389                                                ('boss', boss),
2390                                                ('fs', fs),
2391                                                ('event_server', event_server),
2392                                                ('remote_event_server', remote_event_server),
2393                                                ('seer_control', seer_control),
2394                                                ('local_key_dir', local_key_dir),
2395                                                ('remote_conf_dir', remote_conf_dir),
2396                                                ('conf_file', conf_file),
2397                                                ('remote_conf_file', remote_conf_file),
2398                                                ('remote_script_dir', "/usr/local/federation/bin"),
2399                                                ('local_script_dir', "/usr/local/federation/bin"),
2400                                                )],
2401                                    'interface': [{
2402                                        'substrate': s.name,
2403                                        'attribute': [ { 
2404                                            'attribute': 'ip4_addreess', 
2405                                            'value': tests[dt],
2406                                            }, ],
2407                                        }, ],
2408                                    })
2409                            topo[st].elements.append(portal)
2410                # Connect the gateway nodes into the topologies and clear out
2411                # substrates that are not in the topologies
2412                for tb in testbeds:
2413                    topo[tb].incorporate_elements()
2414                    topo[tb].substrates = \
2415                            [s for s in topo[tb].substrates \
2416                                if len(s.interfaces) >0]
2417
2418                softdir ="%s/software" % tmpdir
2419                softmap = { }
2420                os.mkdir(softdir)
2421                pkgs = set([fedkit, gatewaykit])
2422                pkgs.update([x.location for e in top.elements \
2423                        for x in e.software])
2424                for pkg in pkgs:
2425                    loc = pkg
2426
2427                    scheme, host, path = urlparse(loc)[0:3]
2428                    dest = os.path.basename(path)
2429                    if not scheme:
2430                        if not loc.startswith('/'):
2431                            loc = "/%s" % loc
2432                        loc = "file://%s" %loc
2433                    try:
2434                        u = urlopen(loc)
2435                    except Exception, e:
2436                        raise service_error(service_error.req, 
2437                                "Cannot open %s: %s" % (loc, e))
2438                    try:
2439                        f = open("%s/%s" % (softdir, dest) , "w")
2440                        data = u.read(4096)
2441                        while data:
2442                            f.write(data)
2443                            data = u.read(4096)
2444                        f.close()
2445                        u.close()
2446                    except Exception, e:
2447                        raise service_error(service_error.internal,
2448                                "Could not copy %s: %s" % (loc, e))
2449                    path = re.sub("/tmp", "", softdir)
2450                    # XXX
2451                    softmap[pkg] = \
2452                            "https://users.isi.deterlab.net:23232/%s/%s" %\
2453                            ( path, dest)
2454
2455                # Convert the software locations in the segments into the local
2456                # copies on this host
2457                for soft in [ s for tb in topo.values() \
2458                        for e in tb.elements \
2459                            for s in e.software ]:
2460                    if softmap.has_key(soft.location):
2461                        soft.location = softmap[soft.location]
2462                for tb in testbeds:
2463                    out_topo("%s.xml" %tb, topo[tb])
2464
2465                vtopo = topdl.topology_to_vtopo(top)
2466                vis = self.genviz(vtopo)
2467
2468            except Exception, e:
2469                traceback.print_exc()
2470                raise service_error(service_error.internal, "%s"  % e)
2471
2472
2473
2474            # Build the testbed topologies:
2475
2476
2477            if True:
2478                raise service_error(service_error.internal, "Developing")
2479
2480# XXX old code
2481            # Objects to parse the splitter output (defined above)
2482            parse_current_testbed = self.current_testbed(eid, tmpdir,
2483                    self.fedkit, self.gatewaykit)
2484            parse_allbeds = self.allbeds(self.get_access)
2485            parse_gateways = self.gateways(eid, master, tmpdir,
2486                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
2487                    self.fedkit)
2488            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
2489                        "^#\s+End\s+Vtopo")
2490            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
2491                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
2492            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
2493                    "^#\s+End\s+tarfiles")
2494            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
2495                    "^#\s+End\s+rpms")
2496
2497            # Working on the split data
2498            for line in split_data:
2499                line = line.rstrip()
2500                if parse_current_testbed(line, master, allocated, tbparams):
2501                    continue
2502                elif parse_allbeds(line, user, tbparams, master, export_project,
2503                        access_user):
2504                    continue
2505                elif parse_gateways(line, allocated, tbparams):
2506                    continue
2507                elif parse_vtopo(line):
2508                    continue
2509                elif parse_hostnames(line):
2510                    continue
2511                elif parse_tarfiles(line):
2512                    continue
2513                elif parse_rpms(line):
2514                    continue
2515                else:
2516                    raise service_error(service_error.internal, 
2517                            "Bad tcl parse? %s" % line)
2518            # Virtual topology and visualization
2519            vtopo = self.gentopo(parse_vtopo.str)
2520            if not vtopo:
2521                raise service_error(service_error.internal, 
2522                        "Failed to generate virtual topology")
2523
2524            vis = self.genviz(vtopo)
2525            if not vis:
2526                raise service_error(service_error.internal, 
2527                        "Failed to generate visualization")
2528
2529           
2530            # save federant information
2531            for k in allocated.keys():
2532                tbparams[k]['federant'] = {\
2533                        'name': [ { 'localname' : eid} ],\
2534                        'emulab': tbparams[k]['emulab'],\
2535                        'allocID' : tbparams[k]['allocID'],\
2536                        'master' : k == master,\
2537                    }
2538
2539            self.state_lock.acquire()
2540            self.state[eid]['vtopo'] = vtopo
2541            self.state[eid]['vis'] = vis
2542            self.state[expid]['federant'] = \
2543                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2544                        if tbparams[tb].has_key('federant') ]
2545            if self.state_filename: self.write_state()
2546            self.state_lock.release()
2547
2548            # Copy tarfiles and rpms needed at remote sites into a staging area
2549            try:
2550                if self.fedkit:
2551                    for t in self.fedkit:
2552                        parse_tarfiles.list.append(t[1])
2553                if self.gatewaykit:
2554                    for t in self.gatewaykit:
2555                        parse_tarfiles.list.append(t[1])
2556                for t in parse_tarfiles.list:
2557                    if not os.path.exists("%s/tarfiles" % tmpdir):
2558                        os.mkdir("%s/tarfiles" % tmpdir)
2559                    self.copy_file(t, "%s/tarfiles/%s" % \
2560                            (tmpdir, os.path.basename(t)))
2561                for r in parse_rpms.list:
2562                    if not os.path.exists("%s/rpms" % tmpdir):
2563                        os.mkdir("%s/rpms" % tmpdir)
2564                    self.copy_file(r, "%s/rpms/%s" % \
2565                            (tmpdir, os.path.basename(r)))
2566                # A null experiment file in case we need to create a remote
2567                # experiment from scratch
2568                f = open("%s/null.tcl" % tmpdir, "w")
2569                print >>f, """
2570set ns [new Simulator]
2571source tb_compat.tcl
2572
2573set a [$ns node]
2574
2575$ns rtproto Session
2576$ns run
2577"""
2578                f.close()
2579
2580            except IOError, e:
2581                raise service_error(service_error.internal, 
2582                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2583
2584        except service_error, e:
2585            # If something goes wrong in the parse (usually an access error)
2586            # clear the placeholder state.  From here on out the code delays
2587            # exceptions.  Failing at this point returns a fault to the remote
2588            # caller.
2589            self.state_lock.acquire()
2590            del self.state[eid]
2591            del self.state[expid]
2592            if self.state_filename: self.write_state()
2593            self.state_lock.release()
2594            raise e
2595
2596
2597        # Start the background swapper and return the starting state.  From
2598        # here on out, the state will stick around a while.
2599
2600        # Let users touch the state
2601        self.auth.set_attribute(fid, expid)
2602        self.auth.set_attribute(expid, expid)
2603        # Override fedids can manipulate state as well
2604        for o in self.overrides:
2605            self.auth.set_attribute(o, expid)
2606
2607        # Create a logger that logs to the experiment's state object as well as
2608        # to the main log file.
2609        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2610        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2611        # XXX: there should be a global one of these rather than repeating the
2612        # code.
2613        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2614                    '%d %b %y %H:%M:%S'))
2615        alloc_log.addHandler(h)
2616       
2617        # Start a thread to do the resource allocation
2618        t  = Thread(target=self.allocate_resources,
2619                args=(allocated, master, eid, expid, expcert, tbparams, 
2620                    tmpdir, alloc_log),
2621                name=eid)
2622        t.start()
2623
2624        rv = {
2625                'experimentID': [
2626                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2627                ],
2628                'experimentStatus': 'starting',
2629                'experimentAccess': { 'X509' : expcert }
2630            }
2631
2632        return rv
2633   
2634    def get_experiment_fedid(self, key):
2635        """
2636        find the fedid associated with the localname key in the state database.
2637        """
2638
2639        rv = None
2640        self.state_lock.acquire()
2641        if self.state.has_key(key):
2642            if isinstance(self.state[key], dict):
2643                try:
2644                    kl = [ f['fedid'] for f in \
2645                            self.state[key]['experimentID']\
2646                                if f.has_key('fedid') ]
2647                except KeyError:
2648                    self.state_lock.release()
2649                    raise service_error(service_error.internal, 
2650                            "No fedid for experiment %s when getting "+\
2651                                    "fedid(!?)" % key)
2652                if len(kl) == 1:
2653                    rv = kl[0]
2654                else:
2655                    self.state_lock.release()
2656                    raise service_error(service_error.internal, 
2657                            "multiple fedids for experiment %s when " +\
2658                                    "getting fedid(!?)" % key)
2659            else:
2660                self.state_lock.release()
2661                raise service_error(service_error.internal, 
2662                        "Unexpected state for %s" % key)
2663        self.state_lock.release()
2664        return rv
2665
2666    def check_experiment_access(self, fid, key):
2667        """
2668        Confirm that the fid has access to the experiment.  Though a request
2669        may be made in terms of a local name, the access attribute is always
2670        the experiment's fedid.
2671        """
2672        if not isinstance(key, fedid):
2673            key = self.get_experiment_fedid(key)
2674
2675        if self.auth.check_attribute(fid, key):
2676            return True
2677        else:
2678            raise service_error(service_error.access, "Access Denied")
2679
2680
2681    def get_handler(self, path, fid):
2682        print "in get_handler %s %s" % (path, fid)
2683        return ("/users/faber/test.html", "text/html")
2684
2685    def get_vtopo(self, req, fid):
2686        """
2687        Return the stored virtual topology for this experiment
2688        """
2689        rv = None
2690        state = None
2691
2692        req = req.get('VtopoRequestBody', None)
2693        if not req:
2694            raise service_error(service_error.req,
2695                    "Bad request format (no VtopoRequestBody)")
2696        exp = req.get('experiment', None)
2697        if exp:
2698            if exp.has_key('fedid'):
2699                key = exp['fedid']
2700                keytype = "fedid"
2701            elif exp.has_key('localname'):
2702                key = exp['localname']
2703                keytype = "localname"
2704            else:
2705                raise service_error(service_error.req, "Unknown lookup type")
2706        else:
2707            raise service_error(service_error.req, "No request?")
2708
2709        self.check_experiment_access(fid, key)
2710
2711        self.state_lock.acquire()
2712        if self.state.has_key(key):
2713            if self.state[key].has_key('vtopo'):
2714                rv = { 'experiment' : {keytype: key },\
2715                        'vtopo': self.state[key]['vtopo'],\
2716                    }
2717            else:
2718                state = self.state[key]['experimentStatus']
2719        self.state_lock.release()
2720
2721        if rv: return rv
2722        else: 
2723            if state:
2724                raise service_error(service_error.partial, 
2725                        "Not ready: %s" % state)
2726            else:
2727                raise service_error(service_error.req, "No such experiment")
2728
2729    def get_vis(self, req, fid):
2730        """
2731        Return the stored visualization for this experiment
2732        """
2733        rv = None
2734        state = None
2735
2736        req = req.get('VisRequestBody', None)
2737        if not req:
2738            raise service_error(service_error.req,
2739                    "Bad request format (no VisRequestBody)")
2740        exp = req.get('experiment', None)
2741        if exp:
2742            if exp.has_key('fedid'):
2743                key = exp['fedid']
2744                keytype = "fedid"
2745            elif exp.has_key('localname'):
2746                key = exp['localname']
2747                keytype = "localname"
2748            else:
2749                raise service_error(service_error.req, "Unknown lookup type")
2750        else:
2751            raise service_error(service_error.req, "No request?")
2752
2753        self.check_experiment_access(fid, key)
2754
2755        self.state_lock.acquire()
2756        if self.state.has_key(key):
2757            if self.state[key].has_key('vis'):
2758                rv =  { 'experiment' : {keytype: key },\
2759                        'vis': self.state[key]['vis'],\
2760                        }
2761            else:
2762                state = self.state[key]['experimentStatus']
2763        self.state_lock.release()
2764
2765        if rv: return rv
2766        else:
2767            if state:
2768                raise service_error(service_error.partial, 
2769                        "Not ready: %s" % state)
2770            else:
2771                raise service_error(service_error.req, "No such experiment")
2772
2773    def clean_info_response(self, rv):
2774        """
2775        Remove the information in the experiment's state object that is not in
2776        the info response.
2777        """
2778        # Remove the owner info (should always be there, but...)
2779        if rv.has_key('owner'): del rv['owner']
2780
2781        # Convert the log into the allocationLog parameter and remove the
2782        # log entry (with defensive programming)
2783        if rv.has_key('log'):
2784            rv['allocationLog'] = "".join(rv['log'])
2785            del rv['log']
2786        else:
2787            rv['allocationLog'] = ""
2788
2789        if rv['experimentStatus'] != 'active':
2790            if rv.has_key('federant'): del rv['federant']
2791        else:
2792            # remove the allocationID info from each federant
2793            for f in rv.get('federant', []):
2794                if f.has_key('allocID'): del f['allocID']
2795        return rv
2796
2797    def get_info(self, req, fid):
2798        """
2799        Return all the stored info about this experiment
2800        """
2801        rv = None
2802
2803        req = req.get('InfoRequestBody', None)
2804        if not req:
2805            raise service_error(service_error.req,
2806                    "Bad request format (no InfoRequestBody)")
2807        exp = req.get('experiment', None)
2808        if exp:
2809            if exp.has_key('fedid'):
2810                key = exp['fedid']
2811                keytype = "fedid"
2812            elif exp.has_key('localname'):
2813                key = exp['localname']
2814                keytype = "localname"
2815            else:
2816                raise service_error(service_error.req, "Unknown lookup type")
2817        else:
2818            raise service_error(service_error.req, "No request?")
2819
2820        self.check_experiment_access(fid, key)
2821
2822        # The state may be massaged by the service function that called
2823        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2824        # state.
2825        self.state_lock.acquire()
2826        if self.state.has_key(key):
2827            rv = copy.deepcopy(self.state[key])
2828        self.state_lock.release()
2829
2830        if rv:
2831            return self.clean_info_response(rv)
2832        else:
2833            raise service_error(service_error.req, "No such experiment")
2834
2835    def get_multi_info(self, req, fid):
2836        """
2837        Return all the stored info that this fedid can access
2838        """
2839        rv = { 'info': [ ] }
2840
2841        self.state_lock.acquire()
2842        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2843            self.check_experiment_access(fid, key)
2844
2845            if self.state.has_key(key):
2846                e = copy.deepcopy(self.state[key])
2847                e = self.clean_info_response(e)
2848                rv['info'].append(e)
2849        self.state_lock.release()
2850        return rv
2851
2852
2853    def terminate_experiment(self, req, fid):
2854        """
2855        Swap this experiment out on the federants and delete the shared
2856        information
2857        """
2858        tbparams = { }
2859        req = req.get('TerminateRequestBody', None)
2860        if not req:
2861            raise service_error(service_error.req,
2862                    "Bad request format (no TerminateRequestBody)")
2863        force = req.get('force', False)
2864        exp = req.get('experiment', None)
2865        if exp:
2866            if exp.has_key('fedid'):
2867                key = exp['fedid']
2868                keytype = "fedid"
2869            elif exp.has_key('localname'):
2870                key = exp['localname']
2871                keytype = "localname"
2872            else:
2873                raise service_error(service_error.req, "Unknown lookup type")
2874        else:
2875            raise service_error(service_error.req, "No request?")
2876
2877        self.check_experiment_access(fid, key)
2878
2879        dealloc_list = [ ]
2880
2881
2882        # Create a logger that logs to the dealloc_list as well as to the main
2883        # log file.
2884        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2885        h = logging.StreamHandler(self.list_log(dealloc_list))
2886        # XXX: there should be a global one of these rather than repeating the
2887        # code.
2888        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2889                    '%d %b %y %H:%M:%S'))
2890        dealloc_log.addHandler(h)
2891
2892        self.state_lock.acquire()
2893        fed_exp = self.state.get(key, None)
2894
2895        if fed_exp:
2896            # This branch of the conditional holds the lock to generate a
2897            # consistent temporary tbparams variable to deallocate experiments.
2898            # It releases the lock to do the deallocations and reacquires it to
2899            # remove the experiment state when the termination is complete.
2900
2901            # First make sure that the experiment creation is complete.
2902            status = fed_exp.get('experimentStatus', None)
2903
2904            if status:
2905                if status in ('starting', 'terminating'):
2906                    if not force:
2907                        self.state_lock.release()
2908                        raise service_error(service_error.partial, 
2909                                'Experiment still being created or destroyed')
2910                    else:
2911                        self.log.warning('Experiment in %s state ' % status + \
2912                                'being terminated by force.')
2913            else:
2914                # No status??? trouble
2915                self.state_lock.release()
2916                raise service_error(service_error.internal,
2917                        "Experiment has no status!?")
2918
2919            ids = []
2920            #  experimentID is a list of dicts that are self-describing
2921            #  identifiers.  This finds all the fedids and localnames - the
2922            #  keys of self.state - and puts them into ids.
2923            for id in fed_exp.get('experimentID', []):
2924                if id.has_key('fedid'): ids.append(id['fedid'])
2925                if id.has_key('localname'): ids.append(id['localname'])
2926
2927            # Construct enough of the tbparams to make the stop_segment calls
2928            # work
2929            for fed in fed_exp.get('federant', []):
2930                try:
2931                    for e in fed['name']:
2932                        eid = e.get('localname', None)
2933                        if eid: break
2934                    else:
2935                        continue
2936
2937                    p = fed['emulab']['project']
2938
2939                    project = p['name']['localname']
2940                    tb = p['testbed']['localname']
2941                    user = p['user'][0]['userID']['localname']
2942
2943                    domain = fed['emulab']['domain']
2944                    host  = fed['emulab']['ops']
2945                    aid = fed['allocID']
2946                except KeyError, e:
2947                    continue
2948                tbparams[tb] = {\
2949                        'user': user,\
2950                        'domain': domain,\
2951                        'project': project,\
2952                        'host': host,\
2953                        'eid': eid,\
2954                        'aid': aid,\
2955                    }
2956            fed_exp['experimentStatus'] = 'terminating'
2957            if self.state_filename: self.write_state()
2958            self.state_lock.release()
2959
2960            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2961            # then completes, so we can't wait if nothing starts.  So, no
2962            # tbparams, no start.
2963            if len(tbparams) > 0:
2964                thread_pool = self.thread_pool(self.nthreads)
2965                for tb in tbparams.keys():
2966                    # Create and start a thread to stop the segment
2967                    thread_pool.wait_for_slot()
2968                    t  = self.pooled_thread(\
2969                            target=self.stop_segment(log=dealloc_log,
2970                                keyfile=self.ssh_privkey_file, debug=self.debug), 
2971                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2972                            pdata=thread_pool, trace_file=self.trace_file)
2973                    t.start()
2974                # Wait for completions
2975                thread_pool.wait_for_all_done()
2976
2977            # release the allocations (failed experiments have done this
2978            # already, and starting experiments may be in odd states, so we
2979            # ignore errors releasing those allocations
2980            try: 
2981                for tb in tbparams.keys():
2982                    self.release_access(tb, tbparams[tb]['aid'])
2983            except service_error, e:
2984                if status != 'failed' and not force:
2985                    raise e
2986
2987            # Remove the terminated experiment
2988            self.state_lock.acquire()
2989            for id in ids:
2990                if self.state.has_key(id): del self.state[id]
2991
2992            if self.state_filename: self.write_state()
2993            self.state_lock.release()
2994
2995            return { 
2996                    'experiment': exp , 
2997                    'deallocationLog': "".join(dealloc_list),
2998                    }
2999        else:
3000            # Don't forget to release the lock
3001            self.state_lock.release()
3002            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.