source: fedd/federation/experiment_control.py @ bd3e314

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since bd3e314 was bd3e314, checked in by Ted Faber <faber@…>, 15 years ago

Asynchronous creation and logging. These are the fedd changes. Fedd_client next.

  • Property mode set to 100644
File size: 87.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = Lock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
70   
71    class thread_pool:
72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
148
149    class pooled_thread(Thread):
150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
188
189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
192
193    def __init__(self, config=None, auth=None):
194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
214        self.list_log = experiment_control_local.list_log
215
216        self.cert_file = config.get("experiment_control", "cert_file")
217        if self.cert_file:
218            self.cert_pwd = config.get("experiment_control", "cert_pwd")
219        else:
220            self.cert_file = config.get("globals", "cert_file")
221            self.cert_pwd = config.get("globals", "cert_pwd")
222
223        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
224                or config.get("globals", "trusted_certs")
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 2
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254        self.state = { }
255        self.state_lock = Lock()
256        self.tclsh = "/usr/local/bin/otclsh"
257        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
258                config.get("experiment_control", "tcl_splitter",
259                        "/usr/testbed/lib/ns2ir/parse.tcl")
260        mapdb_file = config.get("experiment_control", "mapdb")
261        self.trace_file = sys.stderr
262
263        self.def_expstart = \
264                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
265                "/tmp/federate";
266        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
267                "FEDDIR/hosts";
268        self.def_gwstart = \
269                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
270                "/tmp/bridge.log";
271        self.def_mgwstart = \
272                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
273                "/tmp/bridge.log";
274        self.def_gwimage = "FBSD61-TUNNEL2";
275        self.def_gwtype = "pc";
276        self.local_access = { }
277
278        if auth:
279            self.auth = auth
280        else:
281            self.log.error(\
282                    "[access]: No authorizer initialized, creating local one.")
283            auth = authorizer()
284
285
286        if self.ssh_pubkey_file:
287            try:
288                f = open(self.ssh_pubkey_file, 'r')
289                self.ssh_pubkey = f.read()
290                f.close()
291            except IOError:
292                raise service_error(service_error.internal,
293                        "Cannot read sshpubkey")
294        else:
295            raise service_error(service_error.internal, 
296                    "No SSH public key file?")
297
298        if not self.ssh_privkey_file:
299            raise service_error(service_error.internal, 
300                    "No SSH public key file?")
301
302
303        if mapdb_file:
304            self.read_mapdb(mapdb_file)
305        else:
306            self.log.warn("[experiment_control] No testbed map, using defaults")
307            self.tbmap = { 
308                    'deter':'https://users.isi.deterlab.net:23235',
309                    'emulab':'https://users.isi.deterlab.net:23236',
310                    'ucb':'https://users.isi.deterlab.net:23237',
311                    }
312
313        if accessdb_file:
314                self.read_accessdb(accessdb_file)
315        else:
316            raise service_error(service_error.internal,
317                    "No accessdb specified in config")
318
319        # Grab saved state.  OK to do this w/o locking because it's read only
320        # and only one thread should be in existence that can see self.state at
321        # this point.
322        if self.state_filename:
323            self.read_state()
324
325        # Dispatch tables
326        self.soap_services = {\
327                'Create': soap_handler('Create', self.create_experiment),
328                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
329                'Vis': soap_handler('Vis', self.get_vis),
330                'Info': soap_handler('Info', self.get_info),
331                'Terminate': soap_handler('Terminate', 
332                    self.terminate_experiment),
333        }
334
335        self.xmlrpc_services = {\
336                'Create': xmlrpc_handler('Create', self.create_experiment),
337                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
338                'Vis': xmlrpc_handler('Vis', self.get_vis),
339                'Info': xmlrpc_handler('Info', self.get_info),
340                'Terminate': xmlrpc_handler('Terminate',
341                    self.terminate_experiment),
342        }
343
344    def copy_file(self, src, dest, size=1024):
345        """
346        Exceedingly simple file copy.
347        """
348        s = open(src,'r')
349        d = open(dest, 'w')
350
351        buf = "x"
352        while buf != "":
353            buf = s.read(size)
354            d.write(buf)
355        s.close()
356        d.close()
357
358    # Call while holding self.state_lock
359    def write_state(self):
360        """
361        Write a new copy of experiment state after copying the existing state
362        to a backup.
363
364        State format is a simple pickling of the state dictionary.
365        """
366        if os.access(self.state_filename, os.W_OK):
367            self.copy_file(self.state_filename, \
368                    "%s.bak" % self.state_filename)
369        try:
370            f = open(self.state_filename, 'w')
371            pickle.dump(self.state, f)
372        except IOError, e:
373            self.log.error("Can't write file %s: %s" % \
374                    (self.state_filename, e))
375        except pickle.PicklingError, e:
376            self.log.error("Pickling problem: %s" % e)
377        except TypeError, e:
378            self.log.error("Pickling problem (TypeError): %s" % e)
379
380    # Call while holding self.state_lock
381    def read_state(self):
382        """
383        Read a new copy of experiment state.  Old state is overwritten.
384
385        State format is a simple pickling of the state dictionary.
386        """
387        try:
388            f = open(self.state_filename, "r")
389            self.state = pickle.load(f)
390            self.log.debug("[read_state]: Read state from %s" % \
391                    self.state_filename)
392        except IOError, e:
393            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
394                    % (self.state_filename, e))
395        except pickle.UnpicklingError, e:
396            self.log.warning(("[read_state]: No saved state: " + \
397                    "Unpickling failed: %s") % e)
398       
399        for k in self.state.keys():
400            try:
401                # This list should only have one element in it, but phrasing it
402                # as a for loop doesn't cost much, really.  We have to find the
403                # fedid elements anyway.
404                for eid in [ f['fedid'] \
405                        for f in self.state[k]['experimentID']\
406                            if f.has_key('fedid') ]:
407                    self.auth.set_attribute(self.state[k]['owner'], eid)
408            except KeyError, e:
409                self.log.warning("[read_state]: State ownership or identity " +\
410                        "misformatted in %s: %s" % (self.state_filename, e))
411
412
413    def read_accessdb(self, accessdb_file):
414        """
415        Read the mapping from fedids that can create experiments to their name
416        in the 3-level access namespace.  All will be asserted from this
417        testbed and can include the local username and porject that will be
418        asserted on their behalf by this fedd.  Each fedid is also added to the
419        authorization system with the "create" attribute.
420        """
421        self.accessdb = {}
422        # These are the regexps for parsing the db
423        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
424        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
425                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
426        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
427                "\s*->\s*(" + name_expr + ")\s*$")
428        lineno = 0
429
430        # Parse the mappings and store in self.authdb, a dict of
431        # fedid -> (proj, user)
432        try:
433            f = open(accessdb_file, "r")
434            for line in f:
435                lineno += 1
436                line = line.strip()
437                if len(line) == 0 or line.startswith('#'):
438                    continue
439                m = project_line.match(line)
440                if m:
441                    fid = fedid(hexstr=m.group(1))
442                    project, user = m.group(2,3)
443                    if not self.accessdb.has_key(fid):
444                        self.accessdb[fid] = []
445                    self.accessdb[fid].append((project, user))
446                    continue
447
448                m = user_line.match(line)
449                if m:
450                    fid = fedid(hexstr=m.group(1))
451                    project = None
452                    user = m.group(2)
453                    if not self.accessdb.has_key(fid):
454                        self.accessdb[fid] = []
455                    self.accessdb[fid].append((project, user))
456                    continue
457                self.log.warn("[experiment_control] Error parsing access " +\
458                        "db %s at line %d" %  (accessdb_file, lineno))
459        except IOError:
460            raise service_error(service_error.internal,
461                    "Error opening/reading %s as experiment " +\
462                            "control accessdb" %  accessdb_file)
463        f.close()
464
465        # Initialize the authorization attributes
466        for fid in self.accessdb.keys():
467            self.auth.set_attribute(fid, 'create')
468
469    def read_mapdb(self, file):
470        """
471        Read a simple colon separated list of mappings for the
472        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
473        """
474
475        self.tbmap = { }
476        lineno =0
477        try:
478            f = open(file, "r")
479            for line in f:
480                lineno += 1
481                line = line.strip()
482                if line.startswith('#') or len(line) == 0:
483                    continue
484                try:
485                    label, url = line.split(':', 1)
486                    self.tbmap[label] = url
487                except ValueError, e:
488                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
489                            "map db: %s %s" % (lineno, line, e))
490        except IOError, e:
491            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
492                    "open %s: %s" % (file, e))
493        f.close()
494
495    class emulab_segment:
496        def __init__(self, log=None, keyfile=None, debug=False):
497            self.log = log or logging.getLogger(\
498                    'fedd.experiment_control.emulab_segment')
499            self.ssh_privkey_file = keyfile
500            self.debug = debug
501            self.ssh_exec="/usr/bin/ssh"
502            self.scp_exec = "/usr/bin/scp"
503            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
504
505        def scp_file(self, file, user, host, dest=""):
506            """
507            scp a file to the remote host.  If debug is set the action is only
508            logged.
509            """
510
511            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
512                    '-o', 'StrictHostKeyChecking yes', '-i', 
513                    self.ssh_privkey_file, file, 
514                    "%s@%s:%s" % (user, host, dest)]
515            rv = 0
516
517            try:
518                dnull = open("/dev/null", "w")
519            except IOError:
520                self.log.debug("[ssh_file]: failed to open " + \
521                        "/dev/null for redirect")
522                dnull = Null
523
524            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
525            if not self.debug:
526                rv = call(scp_cmd, stdout=dnull, stderr=dnull)
527
528            return rv == 0
529
530        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
531            """
532            Run a remote command on host as user.  If debug is set, the action is
533            only logged.
534            """
535            sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
536                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
537                    (self.ssh_exec, self.ssh_privkey_file, 
538                            user, host, cmd)
539
540            try:
541                dnull = open("/dev/null", "r")
542            except IOError:
543                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
544                        "for redirect")
545                dnull = Null
546
547            self.log.debug("[ssh_cmd]: %s" % sh_str)
548            if not self.debug:
549                if dnull:
550                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
551                else:
552                    sub = Popen(sh_str, shell=True)
553                if timeout:
554                    i = 0
555                    rv = sub.poll()
556                    while i < timeout:
557                        if rv is not None: break
558                        else:
559                            time.sleep(1)
560                            rv = sub.poll()
561                            i += 1
562                    else:
563                        self.log.debug("Process exceeded runtime: %s" % sh_str)
564                        os.kill(sub.pid, signal.SIGKILL)
565                        raise self.ssh_cmd_timeout();
566                    return rv == 0
567                else:
568                    return sub.wait() == 0
569            else:
570                return True
571
572    class start_segment(emulab_segment):
573        def __init__(self, log=None, keyfile=None, debug=False):
574            experiment_control_local.emulab_segment.__init__(self,
575                    log=log, keyfile=keyfile, debug=debug)
576
577        def create_config_tree(self, src_dir, dest_dir, script):
578            """
579            Append commands to script that will create the directory hierarchy
580            on the remote federant.
581            """
582
583            if os.path.isdir(src_dir):
584                print >>script, "mkdir -p %s" % dest_dir
585                print >>script, "chmod 770 %s" % dest_dir
586
587                for f in os.listdir(src_dir):
588                    if os.path.isdir(f):
589                        self.create_config_tree("%s/%s" % (src_dir, f), 
590                                "%s/%s" % (dest_dir, f), script)
591            else:
592                self.log.debug("[create_config_tree]: Not a directory: %s" \
593                        % src_dir)
594
595        def ship_configs(self, host, user, src_dir, dest_dir):
596            """
597            Copy federant-specific configuration files to the federant.
598            """
599            for f in os.listdir(src_dir):
600                if os.path.isdir(f):
601                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
602                            "%s/%s" % (dest_dir, f)):
603                        return False
604                else:
605                    if not self.scp_file("%s/%s" % (src_dir, f), 
606                            user, host, dest_dir):
607                        return False
608            return True
609
610        def get_state(self, user, host, tb, pid, eid):
611            # command to test experiment state
612            expinfo_exec = "/usr/testbed/bin/expinfo" 
613            # Regular expressions to parse the expinfo response
614            state_re = re.compile("State:\s+(\w+)")
615            no_exp_re = re.compile("^No\s+such\s+experiment")
616            swapping_re = re.compile("^No\s+information\s+available.")
617            state = None    # Experiment state parsed from expinfo
618            # The expinfo ssh command.  Note the identity restriction to use
619            # only the identity provided in the pubkey given.
620            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
621                    'StrictHostKeyChecking yes', '-i', 
622                    self.ssh_privkey_file, "%s@%s" % (user, host), 
623                    expinfo_exec, pid, eid]
624
625            dev_null = None
626            try:
627                dev_null = open("/dev/null", "a")
628            except IOError, e:
629                self.log.error("[get_state]: can't open /dev/null: %s" %e)
630
631            if self.debug:
632                state = 'swapped'
633                rv = 0
634            else:
635                status = Popen(cmd, stdout=PIPE, stderr=dev_null)
636                for line in status.stdout:
637                    m = state_re.match(line)
638                    if m: state = m.group(1)
639                    else:
640                        for reg, st in ((no_exp_re, "none"),
641                                (swapping_re, "swapping")):
642                            m = reg.match(line)
643                            if m: state = st
644                rv = status.wait()
645
646            # If the experiment is not present the subcommand returns a
647            # non-zero return value.  If we successfully parsed a "none"
648            # outcome, ignore the return code.
649            if rv != 0 and state != 'none':
650                raise service_error(service_error.internal,
651                        "Cannot get status of segment %s:%s/%s" % \
652                                (tb, pid, eid))
653            elif state not in ('active', 'swapped', 'swapping', 'none'):
654                raise service_error(service_error.internal,
655                        "Cannot get status of segment %s:%s/%s" % \
656                                (tb, pid, eid))
657            else: return state
658
659
660        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
661            """
662            Start a sub-experiment on a federant.
663
664            Get the current state, modify or create as appropriate, ship data
665            and configs and start the experiment.  There are small ordering
666            differences based on the initial state of the sub-experiment.
667            """
668            # ops node in the federant
669            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
670            user = tbparams[tb]['user']     # federant user
671            pid = tbparams[tb]['project']   # federant project
672            # XXX
673            base_confs = ( "hosts",)
674            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
675            # Configuration directories on the remote machine
676            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
677            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
678            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
679
680            state = self.get_state(user, host, tb, pid, eid)
681
682            self.log.debug("[start_segment]: %s: %s" % (tb, state))
683            self.log.info("[start_segment]:transferring experiment to %s" % tb)
684
685            if not self.scp_file("%s/%s/%s" % \
686                    (tmpdir, tb, tclfile), user, host):
687                return False
688           
689            if state == 'none':
690                # Create a null copy of the experiment so that we capture any
691                # logs there if the modify fails.  Emulab software discards the
692                # logs from a failed startexp
693                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
694                    return False
695                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
696                timedout = False
697                try:
698                    if not self.ssh_cmd(user, host,
699                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
700                            "-e %s null.tcl") % (pid, eid), "startexp",
701                            timeout=60 * 10):
702                        return False
703                except self.ssh_cmd_timeout:
704                    timedout = True
705
706                if timedout:
707                    state = self.get_state(user, host, self.ssh_privkey_file, 
708                            tb, eid, pid)
709                    if state != "swapped":
710                        return False
711
712           
713            # Open up a temporary file to contain a script for setting up the
714            # filespace for the new experiment.
715            self.log.info("[start_segment]: creating script file")
716            try:
717                sf, scriptname = tempfile.mkstemp()
718                scriptfile = os.fdopen(sf, 'w')
719            except IOError:
720                return False
721
722            scriptbase = os.path.basename(scriptname)
723
724            # Script the filesystem changes
725            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
726            # Clear and create the tarfiles and rpm directories
727            for d in (tarfiles_dir, rpms_dir):
728                print >>scriptfile, "/bin/rm -rf %s/*" % d
729                print >>scriptfile, "mkdir -p %s" % d
730            print >>scriptfile, 'mkdir -p %s' % proj_dir
731            self.create_config_tree("%s/%s" % (tmpdir, tb),
732                    proj_dir, scriptfile)
733            if os.path.isdir("%s/tarfiles" % tmpdir):
734                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
735                        scriptfile)
736            if os.path.isdir("%s/rpms" % tmpdir):
737                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
738                        scriptfile)
739            print >>scriptfile, "rm -f %s" % scriptbase
740            scriptfile.close()
741
742            # Move the script to the remote machine
743            # XXX: could collide tempfile names on the remote host
744            if self.scp_file(scriptname, user, host, scriptbase):
745                os.remove(scriptname)
746            else:
747                return False
748
749            # Execute the script (and the script's last line deletes it)
750            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
751                return False
752
753            for f in base_confs:
754                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
755                        "%s/%s" % (proj_dir, f)):
756                    return False
757            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
758                    proj_dir):
759                return False
760            if os.path.isdir("%s/tarfiles" % tmpdir):
761                if not self.ship_configs(host, user,
762                        "%s/tarfiles" % tmpdir, tarfiles_dir):
763                    return False
764            if os.path.isdir("%s/rpms" % tmpdir):
765                if not self.ship_configs(host, user,
766                        "%s/rpms" % tmpdir, tarfiles_dir):
767                    return False
768            # Stage the new configuration (active experiments will stay swapped
769            # in now)
770            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
771            try:
772                if not self.ssh_cmd(user, host,
773                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
774                                (pid, eid, tclfile),
775                        "modexp", timeout= 60 * 10):
776                    return False
777            except self.ssh_cmd_timeout:
778                print "modexp timeout"
779                # There's really no way to see if this succeeded or failed, so
780                # if it hangs, assume the worst.
781                return False
782            # Active experiments are still swapped, this swaps the others in.
783            if state != 'active':
784                self.log.info("[start_segment]: Swapping %s in on %s" % \
785                        (eid, tb))
786                timedout = False
787                try:
788                    if not self.ssh_cmd(user, host,
789                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
790                            "swapexp", timeout=10*60):
791                        return False
792                except self.ssh_cmd_timeout:
793                    timedout = True
794               
795                # If the command was terminated, but completed successfully,
796                # report success.
797                if timedout:
798                    state = self.get_state(user, host, self.ssh_privkey_file,
799                            tb, eid, pid)
800                    self.log.debug("[start_segment]: swapin timed out (state)")
801                    return state == 'active'
802            # Everything has gone OK.
803            return True
804
805    class stop_segment(emulab_segment):
806        def __init__(self, log=None, keyfile=None, debug=False):
807            experiment_control_local.emulab_segment.__init__(self,
808                    log=log, keyfile=keyfile, debug=debug)
809
810        def __call__(self, tb, eid, tbparams):
811            """
812            Stop a sub experiment by calling swapexp on the federant
813            """
814            user = tbparams[tb]['user']
815            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
816            pid = tbparams[tb]['project']
817
818            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
819            return self.ssh_cmd(user, host,
820                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
821
822       
823    def generate_ssh_keys(self, dest, type="rsa" ):
824        """
825        Generate a set of keys for the gateways to use to talk.
826
827        Keys are of type type and are stored in the required dest file.
828        """
829        valid_types = ("rsa", "dsa")
830        t = type.lower();
831        if t not in valid_types: raise ValueError
832        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
833
834        try:
835            trace = open("/dev/null", "w")
836        except IOError:
837            raise service_error(service_error.internal,
838                    "Cannot open /dev/null??");
839
840        # May raise CalledProcessError
841        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
842        rv = call(cmd, stdout=trace, stderr=trace)
843        if rv != 0:
844            raise service_error(service_error.internal, 
845                    "Cannot generate nonce ssh keys.  %s return code %d" \
846                            % (self.ssh_keygen, rv))
847
848    def gentopo(self, str):
849        """
850        Generate the topology dtat structure from the splitter's XML
851        representation of it.
852
853        The topology XML looks like:
854            <experiment>
855                <nodes>
856                    <node><vname></vname><ips>ip1:ip2</ips></node>
857                </nodes>
858                <lans>
859                    <lan>
860                        <vname></vname><vnode></vnode><ip></ip>
861                        <bandwidth></bandwidth><member>node:port</member>
862                    </lan>
863                </lans>
864        """
865        class topo_parse:
866            """
867            Parse the topology XML and create the dats structure.
868            """
869            def __init__(self):
870                # Typing of the subelements for data conversion
871                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
872                self.int_subelements = ( 'bandwidth',)
873                self.float_subelements = ( 'delay',)
874                # The final data structure
875                self.nodes = [ ]
876                self.lans =  [ ]
877                self.topo = { \
878                        'node': self.nodes,\
879                        'lan' : self.lans,\
880                    }
881                self.element = { }  # Current element being created
882                self.chars = ""     # Last text seen
883
884            def end_element(self, name):
885                # After each sub element the contents is added to the current
886                # element or to the appropriate list.
887                if name == 'node':
888                    self.nodes.append(self.element)
889                    self.element = { }
890                elif name == 'lan':
891                    self.lans.append(self.element)
892                    self.element = { }
893                elif name in self.str_subelements:
894                    self.element[name] = self.chars
895                    self.chars = ""
896                elif name in self.int_subelements:
897                    self.element[name] = int(self.chars)
898                    self.chars = ""
899                elif name in self.float_subelements:
900                    self.element[name] = float(self.chars)
901                    self.chars = ""
902
903            def found_chars(self, data):
904                self.chars += data.rstrip()
905
906
907        tp = topo_parse();
908        parser = xml.parsers.expat.ParserCreate()
909        parser.EndElementHandler = tp.end_element
910        parser.CharacterDataHandler = tp.found_chars
911
912        parser.Parse(str)
913
914        return tp.topo
915       
916
917    def genviz(self, topo):
918        """
919        Generate the visualization the virtual topology
920        """
921
922        neato = "/usr/local/bin/neato"
923        # These are used to parse neato output and to create the visualization
924        # file.
925        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
926        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
927                "%s</type></node>"
928
929        try:
930            # Node names
931            nodes = [ n['vname'] for n in topo['node'] ]
932            topo_lans = topo['lan']
933        except KeyError:
934            raise service_error(service_error.internal, "Bad topology")
935
936        lans = { }
937        links = { }
938
939        # Walk through the virtual topology, organizing the connections into
940        # 2-node connections (links) and more-than-2-node connections (lans).
941        # When a lan is created, it's added to the list of nodes (there's a
942        # node in the visualization for the lan).
943        for l in topo_lans:
944            if links.has_key(l['vname']):
945                if len(links[l['vname']]) < 2:
946                    links[l['vname']].append(l['vnode'])
947                else:
948                    nodes.append(l['vname'])
949                    lans[l['vname']] = links[l['vname']]
950                    del links[l['vname']]
951                    lans[l['vname']].append(l['vnode'])
952            elif lans.has_key(l['vname']):
953                lans[l['vname']].append(l['vnode'])
954            else:
955                links[l['vname']] = [ l['vnode'] ]
956
957
958        # Open up a temporary file for dot to turn into a visualization
959        try:
960            df, dotname = tempfile.mkstemp()
961            dotfile = os.fdopen(df, 'w')
962        except IOError:
963            raise service_error(service_error.internal,
964                    "Failed to open file in genviz")
965
966        # Generate a dot/neato input file from the links, nodes and lans
967        try:
968            print >>dotfile, "graph G {"
969            for n in nodes:
970                print >>dotfile, '\t"%s"' % n
971            for l in links.keys():
972                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
973            for l in lans.keys():
974                for n in lans[l]:
975                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
976            print >>dotfile, "}"
977            dotfile.close()
978        except TypeError:
979            raise service_error(service_error.internal,
980                    "Single endpoint link in vtopo")
981        except IOError:
982            raise service_error(service_error.internal, "Cannot write dot file")
983
984        # Use dot to create a visualization
985        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
986                '-Gpack=true', dotname], stdout=PIPE)
987
988        # Translate dot to vis format
989        vis_nodes = [ ]
990        vis = { 'node': vis_nodes }
991        for line in dot.stdout:
992            m = vis_re.match(line)
993            if m:
994                vn = m.group(1)
995                vis_node = {'name': vn, \
996                        'x': float(m.group(2)),\
997                        'y' : float(m.group(3)),\
998                    }
999                if vn in links.keys() or vn in lans.keys():
1000                    vis_node['type'] = 'lan'
1001                else:
1002                    vis_node['type'] = 'node'
1003                vis_nodes.append(vis_node)
1004        rv = dot.wait()
1005
1006        os.remove(dotname)
1007        if rv == 0 : return vis
1008        else: return None
1009
1010    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1011            access_user):
1012        """
1013        Get access to testbed through fedd and set the parameters for that tb
1014        """
1015        uri = self.tbmap.get(tb, None)
1016        if not uri:
1017            raise service_error(serice_error.server_config, 
1018                    "Unknown testbed: %s" % tb)
1019
1020        # currently this lumps all users into one service access group
1021        service_keys = [ a for u in user \
1022                for a in u.get('access', []) \
1023                    if a.has_key('sshPubkey')]
1024
1025        if len(service_keys) == 0:
1026            raise service_error(service_error.req, 
1027                    "Must have at least one SSH pubkey for services")
1028
1029
1030        for p, u in access_user:
1031            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1032                    "to %s") %  ((p or "None"), u, uri))
1033
1034            if p:
1035                # Request with user and project specified
1036                req = {\
1037                        'destinationTestbed' : { 'uri' : uri },
1038                        'project': { 
1039                            'name': {'localname': p},
1040                            'user': [ {'userID': { 'localname': u } } ],
1041                            },
1042                        'user':  user,
1043                        'allocID' : { 'localname': 'test' },
1044                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1045                        'serviceAccess' : service_keys
1046                    }
1047            else:
1048                # Request with only user specified
1049                req = {\
1050                        'destinationTestbed' : { 'uri' : uri },
1051                        'user':  [ {'userID': { 'localname': u } } ],
1052                        'allocID' : { 'localname': 'test' },
1053                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1054                        'serviceAccess' : service_keys
1055                    }
1056
1057            if tb == master:
1058                # NB, the export_project parameter is a dict that includes
1059                # the type
1060                req['exportProject'] = export_project
1061
1062            # node resources if any
1063            if nodes != None and len(nodes) > 0:
1064                rnodes = [ ]
1065                for n in nodes:
1066                    rn = { }
1067                    image, hw, count = n.split(":")
1068                    if image: rn['image'] = [ image ]
1069                    if hw: rn['hardware'] = [ hw ]
1070                    if count and int(count) >0 : rn['count'] = int(count)
1071                    rnodes.append(rn)
1072                req['resources']= { }
1073                req['resources']['node'] = rnodes
1074
1075            try:
1076                if self.local_access.has_key(uri):
1077                    # Local access call
1078                    req = { 'RequestAccessRequestBody' : req }
1079                    r = self.local_access[uri].RequestAccess(req, 
1080                            fedid(file=self.cert_file))
1081                    r = { 'RequestAccessResponseBody' : r }
1082                else:
1083                    r = self.call_RequestAccess(uri, req, 
1084                            self.cert_file, self.cert_pwd, self.trusted_certs)
1085            except service_error, e:
1086                if e.code == service_error.access:
1087                    self.log.debug("[get_access] Access denied")
1088                    r = None
1089                    continue
1090                else:
1091                    raise e
1092
1093            if r.has_key('RequestAccessResponseBody'):
1094                # Through to here we have a valid response, not a fault.
1095                # Access denied is a fault, so something better or worse than
1096                # access denied has happened.
1097                r = r['RequestAccessResponseBody']
1098                self.log.debug("[get_access] Access granted")
1099                break
1100            else:
1101                raise service_error(service_error.protocol,
1102                        "Bad proxy response")
1103       
1104        if not r:
1105            raise service_error(service_error.access, 
1106                    "Access denied by %s (%s)" % (tb, uri))
1107
1108        e = r['emulab']
1109        p = e['project']
1110        tbparam[tb] = { 
1111                "boss": e['boss'],
1112                "host": e['ops'],
1113                "domain": e['domain'],
1114                "fs": e['fileServer'],
1115                "eventserver": e['eventServer'],
1116                "project": unpack_id(p['name']),
1117                "emulab" : e,
1118                "allocID" : r['allocID'],
1119                }
1120        # Make the testbed name be the label the user applied
1121        p['testbed'] = {'localname': tb }
1122
1123        for u in p['user']:
1124            role = u.get('role', None)
1125            if role == 'experimentCreation':
1126                tbparam[tb]['user'] = unpack_id(u['userID'])
1127                break
1128        else:
1129            raise service_error(service_error.internal, 
1130                    "No createExperimentUser from %s" %tb)
1131
1132        # Add attributes to barameter space.  We don't allow attributes to
1133        # overlay any parameters already installed.
1134        for a in e['fedAttr']:
1135            try:
1136                if a['attribute'] and isinstance(a['attribute'], basestring)\
1137                        and not tbparam[tb].has_key(a['attribute'].lower()):
1138                    tbparam[tb][a['attribute'].lower()] = a['value']
1139            except KeyError:
1140                self.log.error("Bad attribute in response: %s" % a)
1141       
1142    def release_access(self, tb, aid):
1143        """
1144        Release access to testbed through fedd
1145        """
1146
1147        uri = self.tbmap.get(tb, None)
1148        if not uri:
1149            raise service_error(serice_error.server_config, 
1150                    "Unknown testbed: %s" % tb)
1151
1152        if self.local_access.has_key(uri):
1153            resp = self.local_access[uri].ReleaseAccess(\
1154                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1155                    fedid(file=self.cert_file))
1156            resp = { 'ReleaseAccessResponseBody': resp } 
1157        else:
1158            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1159                    self.cert_file, self.cert_pwd, self.trusted_certs)
1160
1161        # better error coding
1162
1163    def remote_splitter(self, uri, desc, master):
1164
1165        req = {
1166                'description' : { 'ns2description': desc },
1167                'master': master,
1168                'include_fedkit': bool(self.fedkit),
1169                'include_gatewaykit': bool(self.gatewaykit)
1170            }
1171
1172        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1173                self.trusted_certs)
1174
1175        if r.has_key('Ns2SplitResponseBody'):
1176            r = r['Ns2SplitResponseBody']
1177            if r.has_key('output'):
1178                return r['output'].splitlines()
1179            else:
1180                raise service_error(service_error.protocol, 
1181                        "Bad splitter response (no output)")
1182        else:
1183            raise service_error(service_error.protocol, "Bad splitter response")
1184       
1185    class current_testbed:
1186        """
1187        Object for collecting the current testbed description.  The testbed
1188        description is saved to a file with the local testbed variables
1189        subsittuted line by line.
1190        """
1191        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1192            def tar_list_to_string(tl):
1193                if tl is None: return None
1194
1195                rv = ""
1196                for t in tl:
1197                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1198                            (t[0], os.path.basename(t[1]))
1199                return rv
1200
1201
1202            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1203            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1204            self.current_testbed = None
1205            self.testbed_file = None
1206
1207            self.def_expstart = \
1208                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1209            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1210            self.def_gwstart = \
1211                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1212            self.def_mgwstart = \
1213                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1214            self.def_gwimage = "FBSD61-TUNNEL2";
1215            self.def_gwtype = "pc";
1216            self.def_mgwcmd = '# '
1217            self.def_mgwcmdparams = ''
1218            self.def_gwcmd = '# '
1219            self.def_gwcmdparams = ''
1220
1221            self.eid = eid
1222            self.tmpdir = tmpdir
1223            # Convert fedkit and gateway kit (which are lists of tuples) into a
1224            # substituition string.
1225            self.fedkit = tar_list_to_string(fedkit)
1226            self.gatewaykit = tar_list_to_string(gatewaykit)
1227
1228        def __call__(self, line, master, allocated, tbparams):
1229            # Capture testbed topology descriptions
1230            if self.current_testbed == None:
1231                m = self.begin_testbed.match(line)
1232                if m != None:
1233                    self.current_testbed = m.group(1)
1234                    if self.current_testbed == None:
1235                        raise service_error(service_error.req,
1236                                "Bad request format (unnamed testbed)")
1237                    allocated[self.current_testbed] = \
1238                            allocated.get(self.current_testbed,0) + 1
1239                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1240                    if not os.path.exists(tb_dir):
1241                        try:
1242                            os.mkdir(tb_dir)
1243                        except IOError:
1244                            raise service_error(service_error.internal,
1245                                    "Cannot create %s" % tb_dir)
1246                    try:
1247                        self.testbed_file = open("%s/%s.%s.tcl" %
1248                                (tb_dir, self.eid, self.current_testbed), 'w')
1249                    except IOError:
1250                        self.testbed_file = None
1251                    return True
1252                else: return False
1253            else:
1254                m = self.end_testbed.match(line)
1255                if m != None:
1256                    if m.group(1) != self.current_testbed:
1257                        raise service_error(service_error.internal, 
1258                                "Mismatched testbed markers!?")
1259                    if self.testbed_file != None: 
1260                        self.testbed_file.close()
1261                        self.testbed_file = None
1262                    self.current_testbed = None
1263                elif self.testbed_file:
1264                    # Substitute variables and put the line into the local
1265                    # testbed file.
1266                    gwtype = tbparams[self.current_testbed].get(\
1267                            'connectortype', self.def_gwtype)
1268                    gwimage = tbparams[self.current_testbed].get(\
1269                            'connectorimage', self.def_gwimage)
1270                    mgwstart = tbparams[self.current_testbed].get(\
1271                            'masterconnectorstartcmd', self.def_mgwstart)
1272                    mexpstart = tbparams[self.current_testbed].get(\
1273                            'masternodestartcmd', self.def_mexpstart)
1274                    gwstart = tbparams[self.current_testbed].get(\
1275                            'slaveconnectorstartcmd', self.def_gwstart)
1276                    expstart = tbparams[self.current_testbed].get(\
1277                            'slavenodestartcmd', self.def_expstart)
1278                    project = tbparams[self.current_testbed].get('project')
1279                    gwcmd = tbparams[self.current_testbed].get(\
1280                            'slaveconnectorcmd', self.def_gwcmd)
1281                    gwcmdparams = tbparams[self.current_testbed].get(\
1282                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1283                    mgwcmd = tbparams[self.current_testbed].get(\
1284                            'masterconnectorcmd', self.def_gwcmd)
1285                    mgwcmdparams = tbparams[self.current_testbed].get(\
1286                            'masterconnectorcmdparams', self.def_gwcmdparams)
1287                    line = re.sub("GWTYPE", gwtype, line)
1288                    line = re.sub("GWIMAGE", gwimage, line)
1289                    if self.current_testbed == master:
1290                        line = re.sub("GWSTART", mgwstart, line)
1291                        line = re.sub("EXPSTART", mexpstart, line)
1292                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1293                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1294                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1295                    else:
1296                        line = re.sub("GWSTART", gwstart, line)
1297                        line = re.sub("EXPSTART", expstart, line)
1298                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1299                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1300                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1301                    #These expansions contain EID and PROJDIR.  NB these are
1302                    # local fedkit and gatewaykit, which are strings.
1303                    if self.fedkit:
1304                        line = re.sub("FEDKIT", self.fedkit, line)
1305                    if self.gatewaykit:
1306                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1307                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1308                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1309                    line = re.sub("EID", self.eid, line)
1310                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1311                            (project, self.eid), line)
1312                    print >>self.testbed_file, line
1313                return True
1314
1315    class allbeds:
1316        """
1317        Process the Allbeds section.  Get access to each federant and save the
1318        parameters in tbparams
1319        """
1320        def __init__(self, get_access):
1321            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1322            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1323            self.in_allbeds = False
1324            self.get_access = get_access
1325
1326        def __call__(self, line, user, tbparams, master, export_project,
1327                access_user):
1328            # Testbed access parameters
1329            if not self.in_allbeds:
1330                if self.begin_allbeds.match(line):
1331                    self.in_allbeds = True
1332                    return True
1333                else:
1334                    return False
1335            else:
1336                if self.end_allbeds.match(line):
1337                    self.in_allbeds = False
1338                else:
1339                    nodes = line.split('|')
1340                    tb = nodes.pop(0)
1341                    self.get_access(tb, nodes, user, tbparams, master,
1342                            export_project, access_user)
1343                return True
1344
1345    class gateways:
1346        def __init__(self, eid, master, tmpdir, gw_pubkey,
1347                gw_secretkey, copy_file, fedkit):
1348            self.begin_gateways = \
1349                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1350            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1351            self.current_gateways = None
1352            self.control_gateway = None
1353            self.active_end = { }
1354
1355            self.eid = eid
1356            self.master = master
1357            self.tmpdir = tmpdir
1358            self.gw_pubkey_base = gw_pubkey
1359            self.gw_secretkey_base = gw_secretkey
1360
1361            self.copy_file = copy_file
1362            self.fedkit = fedkit
1363
1364
1365        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1366                active_end, tbparams, dtb, myname, desthost, type):
1367            """
1368            Produce a gateway configuration file from a gateways line.
1369            """
1370
1371            sproject = tbparams[gw].get('project', 'project')
1372            dproject = tbparams[dtb].get('project', 'project')
1373            sdomain = ".%s.%s%s" % (eid, sproject,
1374                    tbparams[gw].get('domain', ".example.com"))
1375            ddomain = ".%s.%s%s" % (eid, dproject,
1376                    tbparams[dtb].get('domain', ".example.com"))
1377            boss = tbparams[master].get('boss', "boss")
1378            fs = tbparams[master].get('fs', "fs")
1379            event_server = "%s%s" % \
1380                    (tbparams[gw].get('eventserver', "event_server"),
1381                            tbparams[gw].get('domain', "example.com"))
1382            remote_event_server = "%s%s" % \
1383                    (tbparams[dtb].get('eventserver', "event_server"),
1384                            tbparams[dtb].get('domain', "example.com"))
1385            seer_control = "%s%s" % \
1386                    (tbparams[gw].get('control', "control"), sdomain)
1387            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1388
1389            if self.fedkit:
1390                remote_script_dir = "/usr/local/federation/bin"
1391                local_script_dir = "/usr/local/federation/bin"
1392            else:
1393                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1394                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1395
1396            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1397            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1398            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1399
1400            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1401            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1402
1403            # translate to lower case so the `hostname` hack for specifying
1404            # configuration files works.
1405            conf_file = conf_file.lower();
1406            remote_conf_file = remote_conf_file.lower();
1407
1408            if dtb == master:
1409                active = "false"
1410            elif gw == master:
1411                active = "true"
1412            elif active_end.has_key('%s-%s' % (dtb, gw)):
1413                active = "false"
1414            else:
1415                active_end['%s-%s' % (gw, dtb)] = 1
1416                active = "true"
1417
1418            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1419            print >>gwconfig, "Active: %s" % active
1420            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1421            if tunnel_iface:
1422                print >>gwconfig, "Interface: %s" % tunnel_iface
1423            print >>gwconfig, "BossName: %s" % boss
1424            print >>gwconfig, "FsName: %s" % fs
1425            print >>gwconfig, "EventServerName: %s" % event_server
1426            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1427            print >>gwconfig, "SeerControl: %s" % seer_control
1428            print >>gwconfig, "Type: %s" % type
1429            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1430            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1431                    local_script_dir
1432            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1433            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1434            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1435                    (remote_conf_dir, remote_conf_file)
1436            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1437            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1438            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1439            gwconfig.close()
1440
1441            return active == "true"
1442
1443        def __call__(self, line, allocated, tbparams):
1444            # Process gateways
1445            if not self.current_gateways:
1446                m = self.begin_gateways.match(line)
1447                if m:
1448                    self.current_gateways = m.group(1)
1449                    if allocated.has_key(self.current_gateways):
1450                        # This test should always succeed
1451                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1452                        if not os.path.exists(tb_dir):
1453                            try:
1454                                os.mkdir(tb_dir)
1455                            except IOError:
1456                                raise service_error(service_error.internal,
1457                                        "Cannot create %s" % tb_dir)
1458                    else:
1459                        # XXX
1460                        self.log.error("[gateways]: Ignoring gateways for " + \
1461                                "unknown testbed %s" % self.current_gateways)
1462                        self.current_gateways = None
1463                    return True
1464                else:
1465                    return False
1466            else:
1467                m = self.end_gateways.match(line)
1468                if m :
1469                    if m.group(1) != self.current_gateways:
1470                        raise service_error(service_error.internal,
1471                                "Mismatched gateway markers!?")
1472                    if self.control_gateway:
1473                        try:
1474                            cc = open("%s/%s/client.conf" %
1475                                    (self.tmpdir, self.current_gateways), 'w')
1476                            print >>cc, "ControlGateway: %s" % \
1477                                    self.control_gateway
1478                            if tbparams[self.master].has_key('smbshare'):
1479                                print >>cc, "SMBSHare: %s" % \
1480                                        tbparams[self.master]['smbshare']
1481                            print >>cc, "ProjectUser: %s" % \
1482                                    tbparams[self.master]['user']
1483                            print >>cc, "ProjectName: %s" % \
1484                                    tbparams[self.master]['project']
1485                            print >>cc, "ExperimentID: %s/%s" % \
1486                                    ( tbparams[self.master]['project'], \
1487                                    self.eid )
1488                            cc.close()
1489                        except IOError:
1490                            raise service_error(service_error.internal,
1491                                    "Error creating client config")
1492                        # XXX: This seer specific file should disappear
1493                        try:
1494                            cc = open("%s/%s/seer.conf" %
1495                                    (self.tmpdir, self.current_gateways),
1496                                    'w')
1497                            if self.current_gateways != self.master:
1498                                print >>cc, "ControlNode: %s" % \
1499                                        self.control_gateway
1500                            print >>cc, "ExperimentID: %s/%s" % \
1501                                    ( tbparams[self.master]['project'], \
1502                                    self.eid )
1503                            cc.close()
1504                        except IOError:
1505                            raise service_error(service_error.internal,
1506                                    "Error creating seer config")
1507                    else:
1508                        debug.error("[gateways]: No control gateway for %s" %\
1509                                    self.current_gateways)
1510                    self.current_gateways = None
1511                else:
1512                    dtb, myname, desthost, type = line.split(" ")
1513
1514                    if type == "control" or type == "both":
1515                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1516                                self.eid, 
1517                                tbparams[self.current_gateways]['project'],
1518                                tbparams[self.current_gateways]['domain'])
1519                    try:
1520                        active = self.gateway_conf_file(self.current_gateways,
1521                                self.master, self.eid, self.gw_pubkey_base,
1522                                self.gw_secretkey_base,
1523                                self.active_end, tbparams, dtb, myname,
1524                                desthost, type)
1525                    except IOError, e:
1526                        raise service_error(service_error.internal,
1527                                "Failed to write config file for %s" % \
1528                                        self.current_gateway)
1529           
1530                    gw_pubkey = "%s/keys/%s" % \
1531                            (self.tmpdir, self.gw_pubkey_base)
1532                    gw_secretkey = "%s/keys/%s" % \
1533                            (self.tmpdir, self.gw_secretkey_base)
1534
1535                    pkfile = "%s/%s/%s" % \
1536                            ( self.tmpdir, self.current_gateways, 
1537                                    self.gw_pubkey_base)
1538                    skfile = "%s/%s/%s" % \
1539                            ( self.tmpdir, self.current_gateways, 
1540                                    self.gw_secretkey_base)
1541
1542                    if not os.path.exists(pkfile):
1543                        try:
1544                            self.copy_file(gw_pubkey, pkfile)
1545                        except IOError:
1546                            service_error(service_error.internal,
1547                                    "Failed to copy pubkey file")
1548
1549                    if active and not os.path.exists(skfile):
1550                        try:
1551                            self.copy_file(gw_secretkey, skfile)
1552                        except IOError:
1553                            service_error(service_error.internal,
1554                                    "Failed to copy secretkey file")
1555                return True
1556
1557    class shunt_to_file:
1558        """
1559        Simple class to write data between two regexps to a file.
1560        """
1561        def __init__(self, begin, end, filename):
1562            """
1563            Begin shunting on a match of begin, stop on end, send data to
1564            filename.
1565            """
1566            self.begin = re.compile(begin)
1567            self.end = re.compile(end)
1568            self.in_shunt = False
1569            self.file = None
1570            self.filename = filename
1571
1572        def __call__(self, line):
1573            """
1574            Call this on each line in the input that may be shunted.
1575            """
1576            if not self.in_shunt:
1577                if self.begin.match(line):
1578                    self.in_shunt = True
1579                    try:
1580                        self.file = open(self.filename, "w")
1581                    except:
1582                        self.file = None
1583                        raise
1584                    return True
1585                else:
1586                    return False
1587            else:
1588                if self.end.match(line):
1589                    if self.file: 
1590                        self.file.close()
1591                        self.file = None
1592                    self.in_shunt = False
1593                else:
1594                    if self.file:
1595                        print >>self.file, line
1596                return True
1597
1598    class shunt_to_list:
1599        """
1600        Same interface as shunt_to_file.  Data collected in self.list, one list
1601        element per line.
1602        """
1603        def __init__(self, begin, end):
1604            self.begin = re.compile(begin)
1605            self.end = re.compile(end)
1606            self.in_shunt = False
1607            self.list = [ ]
1608       
1609        def __call__(self, line):
1610            if not self.in_shunt:
1611                if self.begin.match(line):
1612                    self.in_shunt = True
1613                    return True
1614                else:
1615                    return False
1616            else:
1617                if self.end.match(line):
1618                    self.in_shunt = False
1619                else:
1620                    self.list.append(line)
1621                return True
1622
1623    class shunt_to_string:
1624        """
1625        Same interface as shunt_to_file.  Data collected in self.str, all in
1626        one string.
1627        """
1628        def __init__(self, begin, end):
1629            self.begin = re.compile(begin)
1630            self.end = re.compile(end)
1631            self.in_shunt = False
1632            self.str = ""
1633       
1634        def __call__(self, line):
1635            if not self.in_shunt:
1636                if self.begin.match(line):
1637                    self.in_shunt = True
1638                    return True
1639                else:
1640                    return False
1641            else:
1642                if self.end.match(line):
1643                    self.in_shunt = False
1644                else:
1645                    self.str += line
1646                return True
1647
1648    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1649            tbparams, tmpdir, alloc_log=None):
1650        started = { }           # Testbeds where a sub-experiment started
1651                                # successfully
1652
1653        # XXX
1654        fail_soft = False
1655
1656        log = alloc_log or self.log
1657
1658        thread_pool = self.thread_pool(self.nthreads)
1659        threads = [ ]
1660
1661        for tb in [ k for k in allocated.keys() if k != master]:
1662            # Create and start a thread to start the segment, and save it to
1663            # get the return value later
1664            thread_pool.wait_for_slot()
1665            t  = self.pooled_thread(\
1666                    target=self.start_segment(log=log,
1667                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1668                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1669                    pdata=thread_pool, trace_file=self.trace_file)
1670            threads.append(t)
1671            t.start()
1672
1673        # Wait until all finish
1674        thread_pool.wait_for_all_done()
1675
1676        # If none failed, start the master
1677        failed = [ t.getName() for t in threads if not t.rv ]
1678
1679        if len(failed) == 0:
1680            starter = self.start_segment(log=log, 
1681                    keyfile=self.ssh_privkey_file, debug=self.debug)
1682            if not starter(master, eid, tbparams, tmpdir):
1683                failed.append(master)
1684
1685        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1686        # If one failed clean up, unless fail_soft is set
1687        if failed:
1688            if not fail_soft:
1689                thread_pool.clear()
1690                for tb in succeeded:
1691                    # Create and start a thread to stop the segment
1692                    thread_pool.wait_for_slot()
1693                    t  = self.pooled_thread(\
1694                            target=self.stop_segment(log=log,
1695                                keyfile=self.ssh_privkey_file,
1696                                debug=self.debug), 
1697                            args=(tb, eid, tbparams), name=tb,
1698                            pdata=thread_pool, trace_file=self.trace_file)
1699                    t.start()
1700                # Wait until all finish
1701                thread_pool.wait_for_all_done()
1702
1703                # release the allocations
1704                for tb in tbparams.keys():
1705                    self.release_access(tb, tbparams[tb]['allocID'])
1706                # Remove the placeholder
1707                self.state_lock.acquire()
1708                self.state[eid]['experimentStatus'] = 'failed'
1709                if self.state_filename: self.write_state()
1710                self.state_lock.release()
1711
1712                #raise service_error(service_error.federant,
1713                #    "Swap in failed on %s" % ",".join(failed))
1714                log.error("Swap in failed on %s" % ",".join(failed))
1715                return
1716        else:
1717            log.info("[start_segment]: Experiment %s active" % eid)
1718
1719        log.debug("[start_experiment]: removing %s" % tmpdir)
1720
1721        # Walk up tmpdir, deleting as we go
1722        for path, dirs, files in os.walk(tmpdir, topdown=False):
1723            for f in files:
1724                os.remove(os.path.join(path, f))
1725            for d in dirs:
1726                os.rmdir(os.path.join(path, d))
1727        os.rmdir(tmpdir)
1728
1729        # Insert the experiment into our state and update the disk copy
1730        self.state_lock.acquire()
1731        self.state[expid]['experimentStatus'] = 'active'
1732        self.state[eid] = self.state[expid]
1733        if self.state_filename: self.write_state()
1734        self.state_lock.release()
1735        return
1736
1737    def create_experiment(self, req, fid):
1738        """
1739        The external interface to experiment creation called from the
1740        dispatcher.
1741
1742        Creates a working directory, splits the incoming description using the
1743        splitter script and parses out the avrious subsections using the
1744        lcasses above.  Once each sub-experiment is created, use pooled threads
1745        to instantiate them and start it all up.
1746        """
1747
1748        if not self.auth.check_attribute(fid, 'create'):
1749            raise service_error(service_error.access, "Create access denied")
1750
1751        try:
1752            tmpdir = tempfile.mkdtemp(prefix="split-")
1753        except IOError:
1754            raise service_error(service_error.internal, "Cannot create tmp dir")
1755
1756        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1757        gw_secretkey_base = "fed.%s" % self.ssh_type
1758        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1759        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1760        tclfile = tmpdir + "/experiment.tcl"
1761        tbparams = { }
1762        try:
1763            access_user = self.accessdb[fid]
1764        except KeyError:
1765            raise service_error(service_error.internal,
1766                    "Access map and authorizer out of sync in " + \
1767                            "create_experiment for fedid %s"  % fid)
1768
1769        pid = "dummy"
1770        gid = "dummy"
1771        try:
1772            os.mkdir(tmpdir+"/keys")
1773        except OSError:
1774            raise service_error(service_error.internal,
1775                    "Can't make temporary dir")
1776
1777        req = req.get('CreateRequestBody', None)
1778        if not req:
1779            raise service_error(service_error.req,
1780                    "Bad request format (no CreateRequestBody)")
1781        # The tcl parser needs to read a file so put the content into that file
1782        descr=req.get('experimentdescription', None)
1783        if descr:
1784            file_content=descr.get('ns2description', None)
1785            if file_content:
1786                try:
1787                    f = open(tclfile, 'w')
1788                    f.write(file_content)
1789                    f.close()
1790                except IOError:
1791                    raise service_error(service_error.internal,
1792                            "Cannot write temp experiment description")
1793            else:
1794                raise service_error(service_error.req, 
1795                        "Only ns2descriptions supported")
1796        else:
1797            raise service_error(service_error.req, "No experiment description")
1798
1799        # Generate an ID for the experiment (slice) and a certificate that the
1800        # allocator can use to prove they own it.  We'll ship it back through
1801        # the encrypted connection.
1802        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1803
1804        if req.has_key('experimentID') and \
1805                req['experimentID'].has_key('localname'):
1806            eid = req['experimentID']['localname']
1807            self.state_lock.acquire()
1808            while (self.state.has_key(eid)):
1809                eid += random.choice(string.ascii_letters)
1810            # Initial state
1811            self.state[eid] = {
1812                    'experimentID' : \
1813                            [ { 'localname' : eid }, {'fedid': expid } ],
1814                    'experimentStatus': 'starting',
1815                    'experimentAccess': { 'X509' : expcert },
1816                    'owner': fid,
1817                    'log' : [],
1818                }
1819            self.state[expid] = self.state[eid]
1820            if self.state_filename: self.write_state()
1821            self.state_lock.release()
1822        else:
1823            eid = self.exp_stem
1824            for i in range(0,5):
1825                eid += random.choice(string.ascii_letters)
1826            self.state_lock.acquire()
1827            while (self.state.has_key(eid)):
1828                eid = self.exp_stem
1829                for i in range(0,5):
1830                    eid += random.choice(string.ascii_letters)
1831            # Initial state
1832            self.state[eid] = {
1833                    'experimentID' : \
1834                            [ { 'localname' : eid }, {'fedid': expid } ],
1835                    'experimentStatus': 'starting',
1836                    'experimentAccess': { 'X509' : expcert },
1837                    'owner': fid,
1838                    'log' : [],
1839                }
1840            self.state[expid] = self.state[eid]
1841            if self.state_filename: self.write_state()
1842            self.state_lock.release()
1843
1844        try: 
1845            # This catches exceptions to clear the placeholder if necessary
1846            try:
1847                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1848            except ValueError:
1849                raise service_error(service_error.server_config, 
1850                        "Bad key type (%s)" % self.ssh_type)
1851
1852            user = req.get('user', None)
1853            if user == None:
1854                raise service_error(service_error.req, "No user")
1855
1856            master = req.get('master', None)
1857            if not master:
1858                raise service_error(service_error.req,
1859                        "No master testbed label")
1860            export_project = req.get('exportProject', None)
1861            if not export_project:
1862                raise service_error(service_error.req, "No export project")
1863           
1864            if self.splitter_url:
1865                self.log.debug("Calling remote splitter at %s" % \
1866                        self.splitter_url)
1867                split_data = self.remote_splitter(self.splitter_url,
1868                        file_content, master)
1869            else:
1870                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1871                    str(self.muxmax), '-m', master]
1872
1873                if self.fedkit:
1874                    tclcmd.append('-k')
1875
1876                if self.gatewaykit:
1877                    tclcmd.append('-K')
1878
1879                tclcmd.extend([pid, gid, eid, tclfile])
1880
1881                self.log.debug("running local splitter %s", " ".join(tclcmd))
1882                tclparser = Popen(tclcmd, stdout=PIPE)
1883                split_data = tclparser.stdout
1884
1885            allocated = { }         # Testbeds we can access
1886            # Objects to parse the splitter output (defined above)
1887            parse_current_testbed = self.current_testbed(eid, tmpdir,
1888                    self.fedkit, self.gatewaykit)
1889            parse_allbeds = self.allbeds(self.get_access)
1890            parse_gateways = self.gateways(eid, master, tmpdir,
1891                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1892                    self.fedkit)
1893            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1894                        "^#\s+End\s+Vtopo")
1895            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1896                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1897            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1898                    "^#\s+End\s+tarfiles")
1899            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1900                    "^#\s+End\s+rpms")
1901
1902            # Working on the split data
1903            for line in split_data:
1904                line = line.rstrip()
1905                if parse_current_testbed(line, master, allocated, tbparams):
1906                    continue
1907                elif parse_allbeds(line, user, tbparams, master, export_project,
1908                        access_user):
1909                    continue
1910                elif parse_gateways(line, allocated, tbparams):
1911                    continue
1912                elif parse_vtopo(line):
1913                    continue
1914                elif parse_hostnames(line):
1915                    continue
1916                elif parse_tarfiles(line):
1917                    continue
1918                elif parse_rpms(line):
1919                    continue
1920                else:
1921                    raise service_error(service_error.internal, 
1922                            "Bad tcl parse? %s" % line)
1923            # Virtual topology and visualization
1924            vtopo = self.gentopo(parse_vtopo.str)
1925            if not vtopo:
1926                raise service_error(service_error.internal, 
1927                        "Failed to generate virtual topology")
1928
1929            vis = self.genviz(vtopo)
1930            if not vis:
1931                raise service_error(service_error.internal, 
1932                        "Failed to generate visualization")
1933
1934           
1935            # save federant information
1936            for k in allocated.keys():
1937                tbparams[k]['federant'] = {\
1938                        'name': [ { 'localname' : eid} ],\
1939                        'emulab': tbparams[k]['emulab'],\
1940                        'allocID' : tbparams[k]['allocID'],\
1941                        'master' : k == master,\
1942                    }
1943
1944            self.state_lock.acquire()
1945            self.state[eid]['vtopo'] = vtopo
1946            self.state[eid]['vis'] = vis
1947            self.state[expid]['federant'] = \
1948                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1949                        if tbparams[tb].has_key('federant') ]
1950            if self.state_filename: self.write_state()
1951            self.state_lock.release()
1952
1953            # Copy tarfiles and rpms needed at remote sites into a staging area
1954            try:
1955                if self.fedkit:
1956                    for t in self.fedkit:
1957                        parse_tarfiles.list.append(t[1])
1958                if self.gatewaykit:
1959                    for t in self.gatewaykit:
1960                        parse_tarfiles.list.append(t[1])
1961                for t in parse_tarfiles.list:
1962                    if not os.path.exists("%s/tarfiles" % tmpdir):
1963                        os.mkdir("%s/tarfiles" % tmpdir)
1964                    self.copy_file(t, "%s/tarfiles/%s" % \
1965                            (tmpdir, os.path.basename(t)))
1966                for r in parse_rpms.list:
1967                    if not os.path.exists("%s/rpms" % tmpdir):
1968                        os.mkdir("%s/rpms" % tmpdir)
1969                    self.copy_file(r, "%s/rpms/%s" % \
1970                            (tmpdir, os.path.basename(r)))
1971                # A null experiment file in case we need to create a remote
1972                # experiment from scratch
1973                f = open("%s/null.tcl" % tmpdir, "w")
1974                print >>f, """
1975set ns [new Simulator]
1976source tb_compat.tcl
1977
1978set a [$ns node]
1979
1980$ns rtproto Session
1981$ns run
1982"""
1983                f.close()
1984
1985            except IOError, e:
1986                raise service_error(service_error.internal, 
1987                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1988
1989        except service_error, e:
1990            # If something goes wrong in the parse (usually an access error)
1991            # clear the placeholder state.  From here on out the code delays
1992            # exceptions.  Failing at this point returns a fault to the remote
1993            # caller.
1994            self.state_lock.acquire()
1995            del self.state[eid]
1996            del self.state[expid]
1997            if self.state_filename: self.write_state()
1998            self.state_lock.release()
1999            raise e
2000
2001
2002        # Start the background swapper and return the starting state.  From
2003        # here on out, the state will stick around a while.
2004
2005        # Let users touch the state
2006        self.auth.set_attribute(fid, expid)
2007        self.auth.set_attribute(expid, expid)
2008
2009        # Create a logger that logs to the experiment's state object as well as
2010        # to the main log file.
2011
2012        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2013        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2014        # XXX: there should be a global one of these rather than repeating the
2015        # code.
2016        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2017                    '%d %b %y %H:%M:%S'))
2018        alloc_log.addHandler(h)
2019       
2020
2021
2022
2023
2024        # Start a thread to do the resource allocation
2025        t  = Thread(target=self.allocate_resources,
2026                args=(allocated, master, eid, expid, expcert, tbparams, 
2027                    tmpdir, alloc_log),
2028                name=eid)
2029        t.start()
2030
2031        rv = {
2032                'experimentID': [
2033                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2034                ],
2035                'experimentStatus': 'started',
2036                'experimentAccess': { 'X509' : expcert }
2037            }
2038
2039        return rv
2040
2041    def check_experiment_access(self, fid, key):
2042        """
2043        Confirm that the fid has access to the experiment.  Though a request
2044        may be made in terms of a local name, the access attribute is always
2045        the experiment's fedid.
2046        """
2047        if not isinstance(key, fedid):
2048            self.state_lock.acquire()
2049            if self.state.has_key(key):
2050                if isinstance(self.state[key], dict):
2051                    try:
2052                        kl = [ f['fedid'] for f in \
2053                                self.state[key]['experimentID']\
2054                                    if f.has_key('fedid') ]
2055                    except KeyError:
2056                        self.state_lock.release()
2057                        raise service_error(service_error.internal, 
2058                                "No fedid for experiment %s when checking " +\
2059                                        "access(!?)" % key)
2060                    if len(kl) == 1:
2061                        key = kl[0]
2062                    else:
2063                        self.state_lock.release()
2064                        raise service_error(service_error.internal, 
2065                                "multiple fedids for experiment %s when " +\
2066                                        "checking access(!?)" % key)
2067                elif isinstance(self.state[key], str):
2068                    self.state_lock.release()
2069                    raise service_error(service_error.internal, 
2070                            ("experiment %s is placeholder.  " +\
2071                                    "Creation in progress or aborted oddly") \
2072                                    % key)
2073                else:
2074                    self.state_lock.release()
2075                    raise service_error(service_error.internal, 
2076                            "Unexpected state for %s" % key)
2077
2078            else:
2079                self.state_lock.release()
2080                raise service_error(service_error.access, "Access Denied")
2081            self.state_lock.release()
2082
2083        if self.auth.check_attribute(fid, key):
2084            return True
2085        else:
2086            raise service_error(service_error.access, "Access Denied")
2087
2088
2089
2090    def get_vtopo(self, req, fid):
2091        """
2092        Return the stored virtual topology for this experiment
2093        """
2094        rv = None
2095        state = None
2096
2097        req = req.get('VtopoRequestBody', None)
2098        if not req:
2099            raise service_error(service_error.req,
2100                    "Bad request format (no VtopoRequestBody)")
2101        exp = req.get('experiment', None)
2102        if exp:
2103            if exp.has_key('fedid'):
2104                key = exp['fedid']
2105                keytype = "fedid"
2106            elif exp.has_key('localname'):
2107                key = exp['localname']
2108                keytype = "localname"
2109            else:
2110                raise service_error(service_error.req, "Unknown lookup type")
2111        else:
2112            raise service_error(service_error.req, "No request?")
2113
2114        self.check_experiment_access(fid, key)
2115
2116        self.state_lock.acquire()
2117        if self.state.has_key(key):
2118            if self.state[key].has_key('vtopo'):
2119                rv = { 'experiment' : {keytype: key },\
2120                        'vtopo': self.state[key]['vtopo'],\
2121                    }
2122            else:
2123                state = self.state[key]['experimentStatus']
2124        self.state_lock.release()
2125
2126        if rv: return rv
2127        else: 
2128            if state:
2129                raise service_error(service_error.partial, 
2130                        "Not ready: %s" % state)
2131            else:
2132                raise service_error(service_error.req, "No such experiment")
2133
2134    def get_vis(self, req, fid):
2135        """
2136        Return the stored visualization for this experiment
2137        """
2138        rv = None
2139        state = None
2140
2141        req = req.get('VisRequestBody', None)
2142        if not req:
2143            raise service_error(service_error.req,
2144                    "Bad request format (no VisRequestBody)")
2145        exp = req.get('experiment', None)
2146        if exp:
2147            if exp.has_key('fedid'):
2148                key = exp['fedid']
2149                keytype = "fedid"
2150            elif exp.has_key('localname'):
2151                key = exp['localname']
2152                keytype = "localname"
2153            else:
2154                raise service_error(service_error.req, "Unknown lookup type")
2155        else:
2156            raise service_error(service_error.req, "No request?")
2157
2158        self.check_experiment_access(fid, key)
2159
2160        self.state_lock.acquire()
2161        if self.state.has_key(key):
2162            if self.state[key].has_key('vis'):
2163                rv =  { 'experiment' : {keytype: key },\
2164                        'vis': self.state[key]['vis'],\
2165                        }
2166            else:
2167                state = self.state[key]['experimentStatus']
2168        self.state_lock.release()
2169
2170        if rv: return rv
2171        else:
2172            if state:
2173                raise service_error(service_error.partial, 
2174                        "Not ready: %s" % state)
2175            else:
2176                raise service_error(service_error.req, "No such experiment")
2177
2178    def get_info(self, req, fid):
2179        """
2180        Return all the stored info about this experiment
2181        """
2182        rv = None
2183
2184        req = req.get('InfoRequestBody', None)
2185        if not req:
2186            raise service_error(service_error.req,
2187                    "Bad request format (no VisRequestBody)")
2188        exp = req.get('experiment', None)
2189        if exp:
2190            if exp.has_key('fedid'):
2191                key = exp['fedid']
2192                keytype = "fedid"
2193            elif exp.has_key('localname'):
2194                key = exp['localname']
2195                keytype = "localname"
2196            else:
2197                raise service_error(service_error.req, "Unknown lookup type")
2198        else:
2199            raise service_error(service_error.req, "No request?")
2200
2201        self.check_experiment_access(fid, key)
2202
2203        # The state may be massaged by the service function that called
2204        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2205        # state.
2206        self.state_lock.acquire()
2207        if self.state.has_key(key):
2208            rv = copy.deepcopy(self.state[key])
2209        self.state_lock.release()
2210
2211        if rv:
2212            # Remove the owner info (should always be there, but...)
2213            if rv.has_key('owner'): del rv['owner']
2214
2215            # Convert the log into the allocationLog parameter and remove the
2216            # log entry (with defensive programming)
2217            if rv.has_key('log'):
2218                rv['allocationLog'] = "".join(rv['log'])
2219                del rv['log']
2220
2221            if rv['experimentStatus'] != 'active':
2222                if rv.has_key('federant'): del rv['federant']
2223            else:
2224                # remove the allocationID info from each federant
2225                for f in rv.get('federant', []):
2226                    if f.has_key('allocID'): del f['allocID']
2227
2228            return rv
2229        else:
2230            raise service_error(service_error.req, "No such experiment")
2231
2232
2233    def terminate_experiment(self, req, fid):
2234        """
2235        Swap this experiment out on the federants and delete the shared
2236        information
2237        """
2238        tbparams = { }
2239        req = req.get('TerminateRequestBody', None)
2240        if not req:
2241            raise service_error(service_error.req,
2242                    "Bad request format (no TerminateRequestBody)")
2243        exp = req.get('experiment', None)
2244        if exp:
2245            if exp.has_key('fedid'):
2246                key = exp['fedid']
2247                keytype = "fedid"
2248            elif exp.has_key('localname'):
2249                key = exp['localname']
2250                keytype = "localname"
2251            else:
2252                raise service_error(service_error.req, "Unknown lookup type")
2253        else:
2254            raise service_error(service_error.req, "No request?")
2255
2256        self.check_experiment_access(fid, key)
2257
2258        self.state_lock.acquire()
2259        fed_exp = self.state.get(key, None)
2260
2261        if fed_exp:
2262            # This branch of the conditional holds the lock to generate a
2263            # consistent temporary tbparams variable to deallocate experiments.
2264            # It releases the lock to do the deallocations and reacquires it to
2265            # remove the experiment state when the termination is complete.
2266
2267            # First make sure that the experiment creation is complete.
2268            if fed_exp.has_key('experimentStatus'):
2269                if fed_exp['experimentStatus'] == 'started':
2270                    self.state_lock.release()
2271                    raise service_error(service_error.partial, 
2272                            'Experiment still being created')
2273            else:
2274                # No status??? trouble
2275                self.state_lock.release()
2276                raise service_error(service_error.internal,
2277                        "Experiment has no status!?")
2278
2279            ids = []
2280            #  experimentID is a list of dicts that are self-describing
2281            #  identifiers.  This finds all the fedids and localnames - the
2282            #  keys of self.state - and puts them into ids.
2283            for id in fed_exp.get('experimentID', []):
2284                if id.has_key('fedid'): ids.append(id['fedid'])
2285                if id.has_key('localname'): ids.append(id['localname'])
2286
2287            # Construct enough of the tbparams to make the stop_segment calls
2288            # work
2289            for fed in fed_exp.get('federant', []):
2290                try:
2291                    for e in fed['name']:
2292                        eid = e.get('localname', None)
2293                        if eid: break
2294                    else:
2295                        continue
2296
2297                    p = fed['emulab']['project']
2298
2299                    project = p['name']['localname']
2300                    tb = p['testbed']['localname']
2301                    user = p['user'][0]['userID']['localname']
2302
2303                    domain = fed['emulab']['domain']
2304                    host  = fed['emulab']['ops']
2305                    aid = fed['allocID']
2306                except KeyError, e:
2307                    continue
2308                tbparams[tb] = {\
2309                        'user': user,\
2310                        'domain': domain,\
2311                        'project': project,\
2312                        'host': host,\
2313                        'eid': eid,\
2314                        'aid': aid,\
2315                    }
2316            self.state_lock.release()
2317
2318            # Stop everyone.
2319            thread_pool = self.thread_pool(self.nthreads)
2320            for tb in tbparams.keys():
2321                # Create and start a thread to stop the segment
2322                thread_pool.wait_for_slot()
2323                t  = self.pooled_thread(\
2324                        target=self.stop_segment(log=self.log,
2325                            keyfile=self.ssh_privkey_file, debug=self.debug), 
2326                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2327                        pdata=thread_pool, trace_file=self.trace_file)
2328                t.start()
2329            # Wait for completions
2330            thread_pool.wait_for_all_done()
2331
2332            # release the allocations
2333            for tb in tbparams.keys():
2334                self.release_access(tb, tbparams[tb]['aid'])
2335
2336            # Remove the terminated experiment
2337            self.state_lock.acquire()
2338            for id in ids:
2339                if self.state.has_key(id): del self.state[id]
2340
2341            if self.state_filename: self.write_state()
2342            self.state_lock.release()
2343
2344            return { 'experiment': exp }
2345        else:
2346            # Don't forget to release the lock
2347            self.state_lock.release()
2348            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.