source: fedd/federation/experiment_control.py @ 65f3f29

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 65f3f29 was 65f3f29, checked in by Ted Faber <faber@…>, 15 years ago

Added multi info functions and some helpers to fedd_client

  • Property mode set to 100644
File size: 88.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = Lock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
70   
71    class thread_pool:
72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
148
149    class pooled_thread(Thread):
150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
188
189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
192
193    def __init__(self, config=None, auth=None):
194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
214        self.list_log = experiment_control_local.list_log
215
216        self.cert_file = config.get("experiment_control", "cert_file")
217        if self.cert_file:
218            self.cert_pwd = config.get("experiment_control", "cert_pwd")
219        else:
220            self.cert_file = config.get("globals", "cert_file")
221            self.cert_pwd = config.get("globals", "cert_pwd")
222
223        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
224                or config.get("globals", "trusted_certs")
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 2
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254        self.state = { }
255        self.state_lock = Lock()
256        self.tclsh = "/usr/local/bin/otclsh"
257        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
258                config.get("experiment_control", "tcl_splitter",
259                        "/usr/testbed/lib/ns2ir/parse.tcl")
260        mapdb_file = config.get("experiment_control", "mapdb")
261        self.trace_file = sys.stderr
262
263        self.def_expstart = \
264                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
265                "/tmp/federate";
266        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
267                "FEDDIR/hosts";
268        self.def_gwstart = \
269                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
270                "/tmp/bridge.log";
271        self.def_mgwstart = \
272                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
273                "/tmp/bridge.log";
274        self.def_gwimage = "FBSD61-TUNNEL2";
275        self.def_gwtype = "pc";
276        self.local_access = { }
277
278        if auth:
279            self.auth = auth
280        else:
281            self.log.error(\
282                    "[access]: No authorizer initialized, creating local one.")
283            auth = authorizer()
284
285
286        if self.ssh_pubkey_file:
287            try:
288                f = open(self.ssh_pubkey_file, 'r')
289                self.ssh_pubkey = f.read()
290                f.close()
291            except IOError:
292                raise service_error(service_error.internal,
293                        "Cannot read sshpubkey")
294        else:
295            raise service_error(service_error.internal, 
296                    "No SSH public key file?")
297
298        if not self.ssh_privkey_file:
299            raise service_error(service_error.internal, 
300                    "No SSH public key file?")
301
302
303        if mapdb_file:
304            self.read_mapdb(mapdb_file)
305        else:
306            self.log.warn("[experiment_control] No testbed map, using defaults")
307            self.tbmap = { 
308                    'deter':'https://users.isi.deterlab.net:23235',
309                    'emulab':'https://users.isi.deterlab.net:23236',
310                    'ucb':'https://users.isi.deterlab.net:23237',
311                    }
312
313        if accessdb_file:
314                self.read_accessdb(accessdb_file)
315        else:
316            raise service_error(service_error.internal,
317                    "No accessdb specified in config")
318
319        # Grab saved state.  OK to do this w/o locking because it's read only
320        # and only one thread should be in existence that can see self.state at
321        # this point.
322        if self.state_filename:
323            self.read_state()
324
325        # Dispatch tables
326        self.soap_services = {\
327                'Create': soap_handler('Create', self.create_experiment),
328                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
329                'Vis': soap_handler('Vis', self.get_vis),
330                'Info': soap_handler('Info', self.get_info),
331                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
332                'Terminate': soap_handler('Terminate', 
333                    self.terminate_experiment),
334        }
335
336        self.xmlrpc_services = {\
337                'Create': xmlrpc_handler('Create', self.create_experiment),
338                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
339                'Vis': xmlrpc_handler('Vis', self.get_vis),
340                'Info': xmlrpc_handler('Info', self.get_info),
341                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
342                'Terminate': xmlrpc_handler('Terminate',
343                    self.terminate_experiment),
344        }
345
346    def copy_file(self, src, dest, size=1024):
347        """
348        Exceedingly simple file copy.
349        """
350        s = open(src,'r')
351        d = open(dest, 'w')
352
353        buf = "x"
354        while buf != "":
355            buf = s.read(size)
356            d.write(buf)
357        s.close()
358        d.close()
359
360    # Call while holding self.state_lock
361    def write_state(self):
362        """
363        Write a new copy of experiment state after copying the existing state
364        to a backup.
365
366        State format is a simple pickling of the state dictionary.
367        """
368        if os.access(self.state_filename, os.W_OK):
369            self.copy_file(self.state_filename, \
370                    "%s.bak" % self.state_filename)
371        try:
372            f = open(self.state_filename, 'w')
373            pickle.dump(self.state, f)
374        except IOError, e:
375            self.log.error("Can't write file %s: %s" % \
376                    (self.state_filename, e))
377        except pickle.PicklingError, e:
378            self.log.error("Pickling problem: %s" % e)
379        except TypeError, e:
380            self.log.error("Pickling problem (TypeError): %s" % e)
381
382    # Call while holding self.state_lock
383    def read_state(self):
384        """
385        Read a new copy of experiment state.  Old state is overwritten.
386
387        State format is a simple pickling of the state dictionary.
388        """
389        try:
390            f = open(self.state_filename, "r")
391            self.state = pickle.load(f)
392            self.log.debug("[read_state]: Read state from %s" % \
393                    self.state_filename)
394        except IOError, e:
395            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
396                    % (self.state_filename, e))
397        except pickle.UnpicklingError, e:
398            self.log.warning(("[read_state]: No saved state: " + \
399                    "Unpickling failed: %s") % e)
400       
401        for k in self.state.keys():
402            try:
403                # This list should only have one element in it, but phrasing it
404                # as a for loop doesn't cost much, really.  We have to find the
405                # fedid elements anyway.
406                for eid in [ f['fedid'] \
407                        for f in self.state[k]['experimentID']\
408                            if f.has_key('fedid') ]:
409                    self.auth.set_attribute(self.state[k]['owner'], eid)
410            except KeyError, e:
411                self.log.warning("[read_state]: State ownership or identity " +\
412                        "misformatted in %s: %s" % (self.state_filename, e))
413
414
415    def read_accessdb(self, accessdb_file):
416        """
417        Read the mapping from fedids that can create experiments to their name
418        in the 3-level access namespace.  All will be asserted from this
419        testbed and can include the local username and porject that will be
420        asserted on their behalf by this fedd.  Each fedid is also added to the
421        authorization system with the "create" attribute.
422        """
423        self.accessdb = {}
424        # These are the regexps for parsing the db
425        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
426        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
427                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
428        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
429                "\s*->\s*(" + name_expr + ")\s*$")
430        lineno = 0
431
432        # Parse the mappings and store in self.authdb, a dict of
433        # fedid -> (proj, user)
434        try:
435            f = open(accessdb_file, "r")
436            for line in f:
437                lineno += 1
438                line = line.strip()
439                if len(line) == 0 or line.startswith('#'):
440                    continue
441                m = project_line.match(line)
442                if m:
443                    fid = fedid(hexstr=m.group(1))
444                    project, user = m.group(2,3)
445                    if not self.accessdb.has_key(fid):
446                        self.accessdb[fid] = []
447                    self.accessdb[fid].append((project, user))
448                    continue
449
450                m = user_line.match(line)
451                if m:
452                    fid = fedid(hexstr=m.group(1))
453                    project = None
454                    user = m.group(2)
455                    if not self.accessdb.has_key(fid):
456                        self.accessdb[fid] = []
457                    self.accessdb[fid].append((project, user))
458                    continue
459                self.log.warn("[experiment_control] Error parsing access " +\
460                        "db %s at line %d" %  (accessdb_file, lineno))
461        except IOError:
462            raise service_error(service_error.internal,
463                    "Error opening/reading %s as experiment " +\
464                            "control accessdb" %  accessdb_file)
465        f.close()
466
467        # Initialize the authorization attributes
468        for fid in self.accessdb.keys():
469            self.auth.set_attribute(fid, 'create')
470
471    def read_mapdb(self, file):
472        """
473        Read a simple colon separated list of mappings for the
474        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
475        """
476
477        self.tbmap = { }
478        lineno =0
479        try:
480            f = open(file, "r")
481            for line in f:
482                lineno += 1
483                line = line.strip()
484                if line.startswith('#') or len(line) == 0:
485                    continue
486                try:
487                    label, url = line.split(':', 1)
488                    self.tbmap[label] = url
489                except ValueError, e:
490                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
491                            "map db: %s %s" % (lineno, line, e))
492        except IOError, e:
493            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
494                    "open %s: %s" % (file, e))
495        f.close()
496
497    class emulab_segment:
498        def __init__(self, log=None, keyfile=None, debug=False):
499            self.log = log or logging.getLogger(\
500                    'fedd.experiment_control.emulab_segment')
501            self.ssh_privkey_file = keyfile
502            self.debug = debug
503            self.ssh_exec="/usr/bin/ssh"
504            self.scp_exec = "/usr/bin/scp"
505            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
506
507        def scp_file(self, file, user, host, dest=""):
508            """
509            scp a file to the remote host.  If debug is set the action is only
510            logged.
511            """
512
513            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
514                    '-o', 'StrictHostKeyChecking yes', '-i', 
515                    self.ssh_privkey_file, file, 
516                    "%s@%s:%s" % (user, host, dest)]
517            rv = 0
518
519            try:
520                dnull = open("/dev/null", "w")
521            except IOError:
522                self.log.debug("[ssh_file]: failed to open " + \
523                        "/dev/null for redirect")
524                dnull = Null
525
526            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
527            if not self.debug:
528                rv = call(scp_cmd, stdout=dnull, stderr=dnull)
529
530            return rv == 0
531
532        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
533            """
534            Run a remote command on host as user.  If debug is set, the action
535            is only logged.  Commands are run without stdin, to avoid stray
536            SIGTTINs.
537            """
538            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
539                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
540                    (self.ssh_exec, self.ssh_privkey_file, 
541                            user, host, cmd)
542
543            try:
544                dnull = open("/dev/null", "r")
545            except IOError:
546                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
547                        "for redirect")
548                dnull = Null
549
550            self.log.debug("[ssh_cmd]: %s" % sh_str)
551            if not self.debug:
552                if dnull:
553                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
554                else:
555                    sub = Popen(sh_str, shell=True)
556                if timeout:
557                    i = 0
558                    rv = sub.poll()
559                    while i < timeout:
560                        if rv is not None: break
561                        else:
562                            time.sleep(1)
563                            rv = sub.poll()
564                            i += 1
565                    else:
566                        self.log.debug("Process exceeded runtime: %s" % sh_str)
567                        os.kill(sub.pid, signal.SIGKILL)
568                        raise self.ssh_cmd_timeout();
569                    return rv == 0
570                else:
571                    return sub.wait() == 0
572            else:
573                return True
574
575    class start_segment(emulab_segment):
576        def __init__(self, log=None, keyfile=None, debug=False):
577            experiment_control_local.emulab_segment.__init__(self,
578                    log=log, keyfile=keyfile, debug=debug)
579
580        def create_config_tree(self, src_dir, dest_dir, script):
581            """
582            Append commands to script that will create the directory hierarchy
583            on the remote federant.
584            """
585
586            if os.path.isdir(src_dir):
587                print >>script, "mkdir -p %s" % dest_dir
588                print >>script, "chmod 770 %s" % dest_dir
589
590                for f in os.listdir(src_dir):
591                    if os.path.isdir(f):
592                        self.create_config_tree("%s/%s" % (src_dir, f), 
593                                "%s/%s" % (dest_dir, f), script)
594            else:
595                self.log.debug("[create_config_tree]: Not a directory: %s" \
596                        % src_dir)
597
598        def ship_configs(self, host, user, src_dir, dest_dir):
599            """
600            Copy federant-specific configuration files to the federant.
601            """
602            for f in os.listdir(src_dir):
603                if os.path.isdir(f):
604                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
605                            "%s/%s" % (dest_dir, f)):
606                        return False
607                else:
608                    if not self.scp_file("%s/%s" % (src_dir, f), 
609                            user, host, dest_dir):
610                        return False
611            return True
612
613        def get_state(self, user, host, tb, pid, eid):
614            # command to test experiment state
615            expinfo_exec = "/usr/testbed/bin/expinfo" 
616            # Regular expressions to parse the expinfo response
617            state_re = re.compile("State:\s+(\w+)")
618            no_exp_re = re.compile("^No\s+such\s+experiment")
619            swapping_re = re.compile("^No\s+information\s+available.")
620            state = None    # Experiment state parsed from expinfo
621            # The expinfo ssh command.  Note the identity restriction to use
622            # only the identity provided in the pubkey given.
623            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
624                    'StrictHostKeyChecking yes', '-i', 
625                    self.ssh_privkey_file, "%s@%s" % (user, host), 
626                    expinfo_exec, pid, eid]
627
628            dev_null = None
629            try:
630                dev_null = open("/dev/null", "a")
631            except IOError, e:
632                self.log.error("[get_state]: can't open /dev/null: %s" %e)
633
634            if self.debug:
635                state = 'swapped'
636                rv = 0
637            else:
638                status = Popen(cmd, stdout=PIPE, stderr=dev_null)
639                for line in status.stdout:
640                    m = state_re.match(line)
641                    if m: state = m.group(1)
642                    else:
643                        for reg, st in ((no_exp_re, "none"),
644                                (swapping_re, "swapping")):
645                            m = reg.match(line)
646                            if m: state = st
647                rv = status.wait()
648
649            # If the experiment is not present the subcommand returns a
650            # non-zero return value.  If we successfully parsed a "none"
651            # outcome, ignore the return code.
652            if rv != 0 and state != 'none':
653                raise service_error(service_error.internal,
654                        "Cannot get status of segment %s:%s/%s" % \
655                                (tb, pid, eid))
656            elif state not in ('active', 'swapped', 'swapping', 'none'):
657                raise service_error(service_error.internal,
658                        "Cannot get status of segment %s:%s/%s" % \
659                                (tb, pid, eid))
660            else: return state
661
662
663        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
664            """
665            Start a sub-experiment on a federant.
666
667            Get the current state, modify or create as appropriate, ship data
668            and configs and start the experiment.  There are small ordering
669            differences based on the initial state of the sub-experiment.
670            """
671            # ops node in the federant
672            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
673            user = tbparams[tb]['user']     # federant user
674            pid = tbparams[tb]['project']   # federant project
675            # XXX
676            base_confs = ( "hosts",)
677            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
678            # Configuration directories on the remote machine
679            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
680            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
681            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
682
683            state = self.get_state(user, host, tb, pid, eid)
684
685            self.log.debug("[start_segment]: %s: %s" % (tb, state))
686            self.log.info("[start_segment]:transferring experiment to %s" % tb)
687
688            if not self.scp_file("%s/%s/%s" % \
689                    (tmpdir, tb, tclfile), user, host):
690                return False
691           
692            if state == 'none':
693                # Create a null copy of the experiment so that we capture any
694                # logs there if the modify fails.  Emulab software discards the
695                # logs from a failed startexp
696                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
697                    return False
698                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
699                timedout = False
700                try:
701                    if not self.ssh_cmd(user, host,
702                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
703                            "-e %s null.tcl") % (pid, eid), "startexp",
704                            timeout=60 * 10):
705                        return False
706                except self.ssh_cmd_timeout:
707                    timedout = True
708
709                if timedout:
710                    state = self.get_state(user, host, self.ssh_privkey_file, 
711                            tb, eid, pid)
712                    if state != "swapped":
713                        return False
714
715           
716            # Open up a temporary file to contain a script for setting up the
717            # filespace for the new experiment.
718            self.log.info("[start_segment]: creating script file")
719            try:
720                sf, scriptname = tempfile.mkstemp()
721                scriptfile = os.fdopen(sf, 'w')
722            except IOError:
723                return False
724
725            scriptbase = os.path.basename(scriptname)
726
727            # Script the filesystem changes
728            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
729            # Clear and create the tarfiles and rpm directories
730            for d in (tarfiles_dir, rpms_dir):
731                print >>scriptfile, "/bin/rm -rf %s/*" % d
732                print >>scriptfile, "mkdir -p %s" % d
733            print >>scriptfile, 'mkdir -p %s' % proj_dir
734            self.create_config_tree("%s/%s" % (tmpdir, tb),
735                    proj_dir, scriptfile)
736            if os.path.isdir("%s/tarfiles" % tmpdir):
737                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
738                        scriptfile)
739            if os.path.isdir("%s/rpms" % tmpdir):
740                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
741                        scriptfile)
742            print >>scriptfile, "rm -f %s" % scriptbase
743            scriptfile.close()
744
745            # Move the script to the remote machine
746            # XXX: could collide tempfile names on the remote host
747            if self.scp_file(scriptname, user, host, scriptbase):
748                os.remove(scriptname)
749            else:
750                return False
751
752            # Execute the script (and the script's last line deletes it)
753            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
754                return False
755
756            for f in base_confs:
757                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
758                        "%s/%s" % (proj_dir, f)):
759                    return False
760            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
761                    proj_dir):
762                return False
763            if os.path.isdir("%s/tarfiles" % tmpdir):
764                if not self.ship_configs(host, user,
765                        "%s/tarfiles" % tmpdir, tarfiles_dir):
766                    return False
767            if os.path.isdir("%s/rpms" % tmpdir):
768                if not self.ship_configs(host, user,
769                        "%s/rpms" % tmpdir, tarfiles_dir):
770                    return False
771            # Stage the new configuration (active experiments will stay swapped
772            # in now)
773            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
774            try:
775                if not self.ssh_cmd(user, host,
776                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
777                                (pid, eid, tclfile),
778                        "modexp", timeout= 60 * 10):
779                    return False
780            except self.ssh_cmd_timeout:
781                print "modexp timeout"
782                # There's really no way to see if this succeeded or failed, so
783                # if it hangs, assume the worst.
784                return False
785            # Active experiments are still swapped, this swaps the others in.
786            if state != 'active':
787                self.log.info("[start_segment]: Swapping %s in on %s" % \
788                        (eid, tb))
789                timedout = False
790                try:
791                    if not self.ssh_cmd(user, host,
792                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
793                            "swapexp", timeout=10*60):
794                        return False
795                except self.ssh_cmd_timeout:
796                    timedout = True
797               
798                # If the command was terminated, but completed successfully,
799                # report success.
800                if timedout:
801                    state = self.get_state(user, host, self.ssh_privkey_file,
802                            tb, eid, pid)
803                    self.log.debug("[start_segment]: swapin timed out (state)")
804                    return state == 'active'
805            # Everything has gone OK.
806            return True
807
808    class stop_segment(emulab_segment):
809        def __init__(self, log=None, keyfile=None, debug=False):
810            experiment_control_local.emulab_segment.__init__(self,
811                    log=log, keyfile=keyfile, debug=debug)
812
813        def __call__(self, tb, eid, tbparams):
814            """
815            Stop a sub experiment by calling swapexp on the federant
816            """
817            user = tbparams[tb]['user']
818            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
819            pid = tbparams[tb]['project']
820
821            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
822            return self.ssh_cmd(user, host,
823                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
824
825       
826    def generate_ssh_keys(self, dest, type="rsa" ):
827        """
828        Generate a set of keys for the gateways to use to talk.
829
830        Keys are of type type and are stored in the required dest file.
831        """
832        valid_types = ("rsa", "dsa")
833        t = type.lower();
834        if t not in valid_types: raise ValueError
835        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
836
837        try:
838            trace = open("/dev/null", "w")
839        except IOError:
840            raise service_error(service_error.internal,
841                    "Cannot open /dev/null??");
842
843        # May raise CalledProcessError
844        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
845        rv = call(cmd, stdout=trace, stderr=trace)
846        if rv != 0:
847            raise service_error(service_error.internal, 
848                    "Cannot generate nonce ssh keys.  %s return code %d" \
849                            % (self.ssh_keygen, rv))
850
851    def gentopo(self, str):
852        """
853        Generate the topology dtat structure from the splitter's XML
854        representation of it.
855
856        The topology XML looks like:
857            <experiment>
858                <nodes>
859                    <node><vname></vname><ips>ip1:ip2</ips></node>
860                </nodes>
861                <lans>
862                    <lan>
863                        <vname></vname><vnode></vnode><ip></ip>
864                        <bandwidth></bandwidth><member>node:port</member>
865                    </lan>
866                </lans>
867        """
868        class topo_parse:
869            """
870            Parse the topology XML and create the dats structure.
871            """
872            def __init__(self):
873                # Typing of the subelements for data conversion
874                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
875                self.int_subelements = ( 'bandwidth',)
876                self.float_subelements = ( 'delay',)
877                # The final data structure
878                self.nodes = [ ]
879                self.lans =  [ ]
880                self.topo = { \
881                        'node': self.nodes,\
882                        'lan' : self.lans,\
883                    }
884                self.element = { }  # Current element being created
885                self.chars = ""     # Last text seen
886
887            def end_element(self, name):
888                # After each sub element the contents is added to the current
889                # element or to the appropriate list.
890                if name == 'node':
891                    self.nodes.append(self.element)
892                    self.element = { }
893                elif name == 'lan':
894                    self.lans.append(self.element)
895                    self.element = { }
896                elif name in self.str_subelements:
897                    self.element[name] = self.chars
898                    self.chars = ""
899                elif name in self.int_subelements:
900                    self.element[name] = int(self.chars)
901                    self.chars = ""
902                elif name in self.float_subelements:
903                    self.element[name] = float(self.chars)
904                    self.chars = ""
905
906            def found_chars(self, data):
907                self.chars += data.rstrip()
908
909
910        tp = topo_parse();
911        parser = xml.parsers.expat.ParserCreate()
912        parser.EndElementHandler = tp.end_element
913        parser.CharacterDataHandler = tp.found_chars
914
915        parser.Parse(str)
916
917        return tp.topo
918       
919
920    def genviz(self, topo):
921        """
922        Generate the visualization the virtual topology
923        """
924
925        neato = "/usr/local/bin/neato"
926        # These are used to parse neato output and to create the visualization
927        # file.
928        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
929        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
930                "%s</type></node>"
931
932        try:
933            # Node names
934            nodes = [ n['vname'] for n in topo['node'] ]
935            topo_lans = topo['lan']
936        except KeyError:
937            raise service_error(service_error.internal, "Bad topology")
938
939        lans = { }
940        links = { }
941
942        # Walk through the virtual topology, organizing the connections into
943        # 2-node connections (links) and more-than-2-node connections (lans).
944        # When a lan is created, it's added to the list of nodes (there's a
945        # node in the visualization for the lan).
946        for l in topo_lans:
947            if links.has_key(l['vname']):
948                if len(links[l['vname']]) < 2:
949                    links[l['vname']].append(l['vnode'])
950                else:
951                    nodes.append(l['vname'])
952                    lans[l['vname']] = links[l['vname']]
953                    del links[l['vname']]
954                    lans[l['vname']].append(l['vnode'])
955            elif lans.has_key(l['vname']):
956                lans[l['vname']].append(l['vnode'])
957            else:
958                links[l['vname']] = [ l['vnode'] ]
959
960
961        # Open up a temporary file for dot to turn into a visualization
962        try:
963            df, dotname = tempfile.mkstemp()
964            dotfile = os.fdopen(df, 'w')
965        except IOError:
966            raise service_error(service_error.internal,
967                    "Failed to open file in genviz")
968
969        # Generate a dot/neato input file from the links, nodes and lans
970        try:
971            print >>dotfile, "graph G {"
972            for n in nodes:
973                print >>dotfile, '\t"%s"' % n
974            for l in links.keys():
975                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
976            for l in lans.keys():
977                for n in lans[l]:
978                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
979            print >>dotfile, "}"
980            dotfile.close()
981        except TypeError:
982            raise service_error(service_error.internal,
983                    "Single endpoint link in vtopo")
984        except IOError:
985            raise service_error(service_error.internal, "Cannot write dot file")
986
987        # Use dot to create a visualization
988        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
989                '-Gpack=true', dotname], stdout=PIPE)
990
991        # Translate dot to vis format
992        vis_nodes = [ ]
993        vis = { 'node': vis_nodes }
994        for line in dot.stdout:
995            m = vis_re.match(line)
996            if m:
997                vn = m.group(1)
998                vis_node = {'name': vn, \
999                        'x': float(m.group(2)),\
1000                        'y' : float(m.group(3)),\
1001                    }
1002                if vn in links.keys() or vn in lans.keys():
1003                    vis_node['type'] = 'lan'
1004                else:
1005                    vis_node['type'] = 'node'
1006                vis_nodes.append(vis_node)
1007        rv = dot.wait()
1008
1009        os.remove(dotname)
1010        if rv == 0 : return vis
1011        else: return None
1012
1013    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1014            access_user):
1015        """
1016        Get access to testbed through fedd and set the parameters for that tb
1017        """
1018        uri = self.tbmap.get(tb, None)
1019        if not uri:
1020            raise service_error(serice_error.server_config, 
1021                    "Unknown testbed: %s" % tb)
1022
1023        # currently this lumps all users into one service access group
1024        service_keys = [ a for u in user \
1025                for a in u.get('access', []) \
1026                    if a.has_key('sshPubkey')]
1027
1028        if len(service_keys) == 0:
1029            raise service_error(service_error.req, 
1030                    "Must have at least one SSH pubkey for services")
1031
1032
1033        for p, u in access_user:
1034            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1035                    "to %s") %  ((p or "None"), u, uri))
1036
1037            if p:
1038                # Request with user and project specified
1039                req = {\
1040                        'destinationTestbed' : { 'uri' : uri },
1041                        'project': { 
1042                            'name': {'localname': p},
1043                            'user': [ {'userID': { 'localname': u } } ],
1044                            },
1045                        'user':  user,
1046                        'allocID' : { 'localname': 'test' },
1047                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1048                        'serviceAccess' : service_keys
1049                    }
1050            else:
1051                # Request with only user specified
1052                req = {\
1053                        'destinationTestbed' : { 'uri' : uri },
1054                        'user':  [ {'userID': { 'localname': u } } ],
1055                        'allocID' : { 'localname': 'test' },
1056                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1057                        'serviceAccess' : service_keys
1058                    }
1059
1060            if tb == master:
1061                # NB, the export_project parameter is a dict that includes
1062                # the type
1063                req['exportProject'] = export_project
1064
1065            # node resources if any
1066            if nodes != None and len(nodes) > 0:
1067                rnodes = [ ]
1068                for n in nodes:
1069                    rn = { }
1070                    image, hw, count = n.split(":")
1071                    if image: rn['image'] = [ image ]
1072                    if hw: rn['hardware'] = [ hw ]
1073                    if count and int(count) >0 : rn['count'] = int(count)
1074                    rnodes.append(rn)
1075                req['resources']= { }
1076                req['resources']['node'] = rnodes
1077
1078            try:
1079                if self.local_access.has_key(uri):
1080                    # Local access call
1081                    req = { 'RequestAccessRequestBody' : req }
1082                    r = self.local_access[uri].RequestAccess(req, 
1083                            fedid(file=self.cert_file))
1084                    r = { 'RequestAccessResponseBody' : r }
1085                else:
1086                    r = self.call_RequestAccess(uri, req, 
1087                            self.cert_file, self.cert_pwd, self.trusted_certs)
1088            except service_error, e:
1089                if e.code == service_error.access:
1090                    self.log.debug("[get_access] Access denied")
1091                    r = None
1092                    continue
1093                else:
1094                    raise e
1095
1096            if r.has_key('RequestAccessResponseBody'):
1097                # Through to here we have a valid response, not a fault.
1098                # Access denied is a fault, so something better or worse than
1099                # access denied has happened.
1100                r = r['RequestAccessResponseBody']
1101                self.log.debug("[get_access] Access granted")
1102                break
1103            else:
1104                raise service_error(service_error.protocol,
1105                        "Bad proxy response")
1106       
1107        if not r:
1108            raise service_error(service_error.access, 
1109                    "Access denied by %s (%s)" % (tb, uri))
1110
1111        e = r['emulab']
1112        p = e['project']
1113        tbparam[tb] = { 
1114                "boss": e['boss'],
1115                "host": e['ops'],
1116                "domain": e['domain'],
1117                "fs": e['fileServer'],
1118                "eventserver": e['eventServer'],
1119                "project": unpack_id(p['name']),
1120                "emulab" : e,
1121                "allocID" : r['allocID'],
1122                }
1123        # Make the testbed name be the label the user applied
1124        p['testbed'] = {'localname': tb }
1125
1126        for u in p['user']:
1127            role = u.get('role', None)
1128            if role == 'experimentCreation':
1129                tbparam[tb]['user'] = unpack_id(u['userID'])
1130                break
1131        else:
1132            raise service_error(service_error.internal, 
1133                    "No createExperimentUser from %s" %tb)
1134
1135        # Add attributes to barameter space.  We don't allow attributes to
1136        # overlay any parameters already installed.
1137        for a in e['fedAttr']:
1138            try:
1139                if a['attribute'] and isinstance(a['attribute'], basestring)\
1140                        and not tbparam[tb].has_key(a['attribute'].lower()):
1141                    tbparam[tb][a['attribute'].lower()] = a['value']
1142            except KeyError:
1143                self.log.error("Bad attribute in response: %s" % a)
1144       
1145    def release_access(self, tb, aid):
1146        """
1147        Release access to testbed through fedd
1148        """
1149
1150        uri = self.tbmap.get(tb, None)
1151        if not uri:
1152            raise service_error(serice_error.server_config, 
1153                    "Unknown testbed: %s" % tb)
1154
1155        if self.local_access.has_key(uri):
1156            resp = self.local_access[uri].ReleaseAccess(\
1157                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1158                    fedid(file=self.cert_file))
1159            resp = { 'ReleaseAccessResponseBody': resp } 
1160        else:
1161            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1162                    self.cert_file, self.cert_pwd, self.trusted_certs)
1163
1164        # better error coding
1165
1166    def remote_splitter(self, uri, desc, master):
1167
1168        req = {
1169                'description' : { 'ns2description': desc },
1170                'master': master,
1171                'include_fedkit': bool(self.fedkit),
1172                'include_gatewaykit': bool(self.gatewaykit)
1173            }
1174
1175        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1176                self.trusted_certs)
1177
1178        if r.has_key('Ns2SplitResponseBody'):
1179            r = r['Ns2SplitResponseBody']
1180            if r.has_key('output'):
1181                return r['output'].splitlines()
1182            else:
1183                raise service_error(service_error.protocol, 
1184                        "Bad splitter response (no output)")
1185        else:
1186            raise service_error(service_error.protocol, "Bad splitter response")
1187       
1188    class current_testbed:
1189        """
1190        Object for collecting the current testbed description.  The testbed
1191        description is saved to a file with the local testbed variables
1192        subsittuted line by line.
1193        """
1194        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1195            def tar_list_to_string(tl):
1196                if tl is None: return None
1197
1198                rv = ""
1199                for t in tl:
1200                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1201                            (t[0], os.path.basename(t[1]))
1202                return rv
1203
1204
1205            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1206            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1207            self.current_testbed = None
1208            self.testbed_file = None
1209
1210            self.def_expstart = \
1211                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1212            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1213            self.def_gwstart = \
1214                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1215            self.def_mgwstart = \
1216                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1217            self.def_gwimage = "FBSD61-TUNNEL2";
1218            self.def_gwtype = "pc";
1219            self.def_mgwcmd = '# '
1220            self.def_mgwcmdparams = ''
1221            self.def_gwcmd = '# '
1222            self.def_gwcmdparams = ''
1223
1224            self.eid = eid
1225            self.tmpdir = tmpdir
1226            # Convert fedkit and gateway kit (which are lists of tuples) into a
1227            # substituition string.
1228            self.fedkit = tar_list_to_string(fedkit)
1229            self.gatewaykit = tar_list_to_string(gatewaykit)
1230
1231        def __call__(self, line, master, allocated, tbparams):
1232            # Capture testbed topology descriptions
1233            if self.current_testbed == None:
1234                m = self.begin_testbed.match(line)
1235                if m != None:
1236                    self.current_testbed = m.group(1)
1237                    if self.current_testbed == None:
1238                        raise service_error(service_error.req,
1239                                "Bad request format (unnamed testbed)")
1240                    allocated[self.current_testbed] = \
1241                            allocated.get(self.current_testbed,0) + 1
1242                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1243                    if not os.path.exists(tb_dir):
1244                        try:
1245                            os.mkdir(tb_dir)
1246                        except IOError:
1247                            raise service_error(service_error.internal,
1248                                    "Cannot create %s" % tb_dir)
1249                    try:
1250                        self.testbed_file = open("%s/%s.%s.tcl" %
1251                                (tb_dir, self.eid, self.current_testbed), 'w')
1252                    except IOError:
1253                        self.testbed_file = None
1254                    return True
1255                else: return False
1256            else:
1257                m = self.end_testbed.match(line)
1258                if m != None:
1259                    if m.group(1) != self.current_testbed:
1260                        raise service_error(service_error.internal, 
1261                                "Mismatched testbed markers!?")
1262                    if self.testbed_file != None: 
1263                        self.testbed_file.close()
1264                        self.testbed_file = None
1265                    self.current_testbed = None
1266                elif self.testbed_file:
1267                    # Substitute variables and put the line into the local
1268                    # testbed file.
1269                    gwtype = tbparams[self.current_testbed].get(\
1270                            'connectortype', self.def_gwtype)
1271                    gwimage = tbparams[self.current_testbed].get(\
1272                            'connectorimage', self.def_gwimage)
1273                    mgwstart = tbparams[self.current_testbed].get(\
1274                            'masterconnectorstartcmd', self.def_mgwstart)
1275                    mexpstart = tbparams[self.current_testbed].get(\
1276                            'masternodestartcmd', self.def_mexpstart)
1277                    gwstart = tbparams[self.current_testbed].get(\
1278                            'slaveconnectorstartcmd', self.def_gwstart)
1279                    expstart = tbparams[self.current_testbed].get(\
1280                            'slavenodestartcmd', self.def_expstart)
1281                    project = tbparams[self.current_testbed].get('project')
1282                    gwcmd = tbparams[self.current_testbed].get(\
1283                            'slaveconnectorcmd', self.def_gwcmd)
1284                    gwcmdparams = tbparams[self.current_testbed].get(\
1285                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1286                    mgwcmd = tbparams[self.current_testbed].get(\
1287                            'masterconnectorcmd', self.def_gwcmd)
1288                    mgwcmdparams = tbparams[self.current_testbed].get(\
1289                            'masterconnectorcmdparams', self.def_gwcmdparams)
1290                    line = re.sub("GWTYPE", gwtype, line)
1291                    line = re.sub("GWIMAGE", gwimage, line)
1292                    if self.current_testbed == master:
1293                        line = re.sub("GWSTART", mgwstart, line)
1294                        line = re.sub("EXPSTART", mexpstart, line)
1295                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1296                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1297                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1298                    else:
1299                        line = re.sub("GWSTART", gwstart, line)
1300                        line = re.sub("EXPSTART", expstart, line)
1301                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1302                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1303                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1304                    #These expansions contain EID and PROJDIR.  NB these are
1305                    # local fedkit and gatewaykit, which are strings.
1306                    if self.fedkit:
1307                        line = re.sub("FEDKIT", self.fedkit, line)
1308                    if self.gatewaykit:
1309                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1310                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1311                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1312                    line = re.sub("EID", self.eid, line)
1313                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1314                            (project, self.eid), line)
1315                    print >>self.testbed_file, line
1316                return True
1317
1318    class allbeds:
1319        """
1320        Process the Allbeds section.  Get access to each federant and save the
1321        parameters in tbparams
1322        """
1323        def __init__(self, get_access):
1324            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1325            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1326            self.in_allbeds = False
1327            self.get_access = get_access
1328
1329        def __call__(self, line, user, tbparams, master, export_project,
1330                access_user):
1331            # Testbed access parameters
1332            if not self.in_allbeds:
1333                if self.begin_allbeds.match(line):
1334                    self.in_allbeds = True
1335                    return True
1336                else:
1337                    return False
1338            else:
1339                if self.end_allbeds.match(line):
1340                    self.in_allbeds = False
1341                else:
1342                    nodes = line.split('|')
1343                    tb = nodes.pop(0)
1344                    self.get_access(tb, nodes, user, tbparams, master,
1345                            export_project, access_user)
1346                return True
1347
1348    class gateways:
1349        def __init__(self, eid, master, tmpdir, gw_pubkey,
1350                gw_secretkey, copy_file, fedkit):
1351            self.begin_gateways = \
1352                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1353            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1354            self.current_gateways = None
1355            self.control_gateway = None
1356            self.active_end = { }
1357
1358            self.eid = eid
1359            self.master = master
1360            self.tmpdir = tmpdir
1361            self.gw_pubkey_base = gw_pubkey
1362            self.gw_secretkey_base = gw_secretkey
1363
1364            self.copy_file = copy_file
1365            self.fedkit = fedkit
1366
1367
1368        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1369                active_end, tbparams, dtb, myname, desthost, type):
1370            """
1371            Produce a gateway configuration file from a gateways line.
1372            """
1373
1374            sproject = tbparams[gw].get('project', 'project')
1375            dproject = tbparams[dtb].get('project', 'project')
1376            sdomain = ".%s.%s%s" % (eid, sproject,
1377                    tbparams[gw].get('domain', ".example.com"))
1378            ddomain = ".%s.%s%s" % (eid, dproject,
1379                    tbparams[dtb].get('domain', ".example.com"))
1380            boss = tbparams[master].get('boss', "boss")
1381            fs = tbparams[master].get('fs', "fs")
1382            event_server = "%s%s" % \
1383                    (tbparams[gw].get('eventserver', "event_server"),
1384                            tbparams[gw].get('domain', "example.com"))
1385            remote_event_server = "%s%s" % \
1386                    (tbparams[dtb].get('eventserver', "event_server"),
1387                            tbparams[dtb].get('domain', "example.com"))
1388            seer_control = "%s%s" % \
1389                    (tbparams[gw].get('control', "control"), sdomain)
1390            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1391
1392            if self.fedkit:
1393                remote_script_dir = "/usr/local/federation/bin"
1394                local_script_dir = "/usr/local/federation/bin"
1395            else:
1396                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1397                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1398
1399            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1400            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1401            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1402
1403            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1404            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1405
1406            # translate to lower case so the `hostname` hack for specifying
1407            # configuration files works.
1408            conf_file = conf_file.lower();
1409            remote_conf_file = remote_conf_file.lower();
1410
1411            if dtb == master:
1412                active = "false"
1413            elif gw == master:
1414                active = "true"
1415            elif active_end.has_key('%s-%s' % (dtb, gw)):
1416                active = "false"
1417            else:
1418                active_end['%s-%s' % (gw, dtb)] = 1
1419                active = "true"
1420
1421            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1422            print >>gwconfig, "Active: %s" % active
1423            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1424            if tunnel_iface:
1425                print >>gwconfig, "Interface: %s" % tunnel_iface
1426            print >>gwconfig, "BossName: %s" % boss
1427            print >>gwconfig, "FsName: %s" % fs
1428            print >>gwconfig, "EventServerName: %s" % event_server
1429            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1430            print >>gwconfig, "SeerControl: %s" % seer_control
1431            print >>gwconfig, "Type: %s" % type
1432            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1433            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1434                    local_script_dir
1435            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1436            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1437            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1438                    (remote_conf_dir, remote_conf_file)
1439            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1440            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1441            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1442            gwconfig.close()
1443
1444            return active == "true"
1445
1446        def __call__(self, line, allocated, tbparams):
1447            # Process gateways
1448            if not self.current_gateways:
1449                m = self.begin_gateways.match(line)
1450                if m:
1451                    self.current_gateways = m.group(1)
1452                    if allocated.has_key(self.current_gateways):
1453                        # This test should always succeed
1454                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1455                        if not os.path.exists(tb_dir):
1456                            try:
1457                                os.mkdir(tb_dir)
1458                            except IOError:
1459                                raise service_error(service_error.internal,
1460                                        "Cannot create %s" % tb_dir)
1461                    else:
1462                        # XXX
1463                        self.log.error("[gateways]: Ignoring gateways for " + \
1464                                "unknown testbed %s" % self.current_gateways)
1465                        self.current_gateways = None
1466                    return True
1467                else:
1468                    return False
1469            else:
1470                m = self.end_gateways.match(line)
1471                if m :
1472                    if m.group(1) != self.current_gateways:
1473                        raise service_error(service_error.internal,
1474                                "Mismatched gateway markers!?")
1475                    if self.control_gateway:
1476                        try:
1477                            cc = open("%s/%s/client.conf" %
1478                                    (self.tmpdir, self.current_gateways), 'w')
1479                            print >>cc, "ControlGateway: %s" % \
1480                                    self.control_gateway
1481                            if tbparams[self.master].has_key('smbshare'):
1482                                print >>cc, "SMBSHare: %s" % \
1483                                        tbparams[self.master]['smbshare']
1484                            print >>cc, "ProjectUser: %s" % \
1485                                    tbparams[self.master]['user']
1486                            print >>cc, "ProjectName: %s" % \
1487                                    tbparams[self.master]['project']
1488                            print >>cc, "ExperimentID: %s/%s" % \
1489                                    ( tbparams[self.master]['project'], \
1490                                    self.eid )
1491                            cc.close()
1492                        except IOError:
1493                            raise service_error(service_error.internal,
1494                                    "Error creating client config")
1495                        # XXX: This seer specific file should disappear
1496                        try:
1497                            cc = open("%s/%s/seer.conf" %
1498                                    (self.tmpdir, self.current_gateways),
1499                                    'w')
1500                            if self.current_gateways != self.master:
1501                                print >>cc, "ControlNode: %s" % \
1502                                        self.control_gateway
1503                            print >>cc, "ExperimentID: %s/%s" % \
1504                                    ( tbparams[self.master]['project'], \
1505                                    self.eid )
1506                            cc.close()
1507                        except IOError:
1508                            raise service_error(service_error.internal,
1509                                    "Error creating seer config")
1510                    else:
1511                        debug.error("[gateways]: No control gateway for %s" %\
1512                                    self.current_gateways)
1513                    self.current_gateways = None
1514                else:
1515                    dtb, myname, desthost, type = line.split(" ")
1516
1517                    if type == "control" or type == "both":
1518                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1519                                self.eid, 
1520                                tbparams[self.current_gateways]['project'],
1521                                tbparams[self.current_gateways]['domain'])
1522                    try:
1523                        active = self.gateway_conf_file(self.current_gateways,
1524                                self.master, self.eid, self.gw_pubkey_base,
1525                                self.gw_secretkey_base,
1526                                self.active_end, tbparams, dtb, myname,
1527                                desthost, type)
1528                    except IOError, e:
1529                        raise service_error(service_error.internal,
1530                                "Failed to write config file for %s" % \
1531                                        self.current_gateway)
1532           
1533                    gw_pubkey = "%s/keys/%s" % \
1534                            (self.tmpdir, self.gw_pubkey_base)
1535                    gw_secretkey = "%s/keys/%s" % \
1536                            (self.tmpdir, self.gw_secretkey_base)
1537
1538                    pkfile = "%s/%s/%s" % \
1539                            ( self.tmpdir, self.current_gateways, 
1540                                    self.gw_pubkey_base)
1541                    skfile = "%s/%s/%s" % \
1542                            ( self.tmpdir, self.current_gateways, 
1543                                    self.gw_secretkey_base)
1544
1545                    if not os.path.exists(pkfile):
1546                        try:
1547                            self.copy_file(gw_pubkey, pkfile)
1548                        except IOError:
1549                            service_error(service_error.internal,
1550                                    "Failed to copy pubkey file")
1551
1552                    if active and not os.path.exists(skfile):
1553                        try:
1554                            self.copy_file(gw_secretkey, skfile)
1555                        except IOError:
1556                            service_error(service_error.internal,
1557                                    "Failed to copy secretkey file")
1558                return True
1559
1560    class shunt_to_file:
1561        """
1562        Simple class to write data between two regexps to a file.
1563        """
1564        def __init__(self, begin, end, filename):
1565            """
1566            Begin shunting on a match of begin, stop on end, send data to
1567            filename.
1568            """
1569            self.begin = re.compile(begin)
1570            self.end = re.compile(end)
1571            self.in_shunt = False
1572            self.file = None
1573            self.filename = filename
1574
1575        def __call__(self, line):
1576            """
1577            Call this on each line in the input that may be shunted.
1578            """
1579            if not self.in_shunt:
1580                if self.begin.match(line):
1581                    self.in_shunt = True
1582                    try:
1583                        self.file = open(self.filename, "w")
1584                    except:
1585                        self.file = None
1586                        raise
1587                    return True
1588                else:
1589                    return False
1590            else:
1591                if self.end.match(line):
1592                    if self.file: 
1593                        self.file.close()
1594                        self.file = None
1595                    self.in_shunt = False
1596                else:
1597                    if self.file:
1598                        print >>self.file, line
1599                return True
1600
1601    class shunt_to_list:
1602        """
1603        Same interface as shunt_to_file.  Data collected in self.list, one list
1604        element per line.
1605        """
1606        def __init__(self, begin, end):
1607            self.begin = re.compile(begin)
1608            self.end = re.compile(end)
1609            self.in_shunt = False
1610            self.list = [ ]
1611       
1612        def __call__(self, line):
1613            if not self.in_shunt:
1614                if self.begin.match(line):
1615                    self.in_shunt = True
1616                    return True
1617                else:
1618                    return False
1619            else:
1620                if self.end.match(line):
1621                    self.in_shunt = False
1622                else:
1623                    self.list.append(line)
1624                return True
1625
1626    class shunt_to_string:
1627        """
1628        Same interface as shunt_to_file.  Data collected in self.str, all in
1629        one string.
1630        """
1631        def __init__(self, begin, end):
1632            self.begin = re.compile(begin)
1633            self.end = re.compile(end)
1634            self.in_shunt = False
1635            self.str = ""
1636       
1637        def __call__(self, line):
1638            if not self.in_shunt:
1639                if self.begin.match(line):
1640                    self.in_shunt = True
1641                    return True
1642                else:
1643                    return False
1644            else:
1645                if self.end.match(line):
1646                    self.in_shunt = False
1647                else:
1648                    self.str += line
1649                return True
1650
1651    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1652            tbparams, tmpdir, alloc_log=None):
1653        started = { }           # Testbeds where a sub-experiment started
1654                                # successfully
1655
1656        # XXX
1657        fail_soft = False
1658
1659        log = alloc_log or self.log
1660
1661        thread_pool = self.thread_pool(self.nthreads)
1662        threads = [ ]
1663
1664        for tb in [ k for k in allocated.keys() if k != master]:
1665            # Create and start a thread to start the segment, and save it to
1666            # get the return value later
1667            thread_pool.wait_for_slot()
1668            t  = self.pooled_thread(\
1669                    target=self.start_segment(log=log,
1670                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1671                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1672                    pdata=thread_pool, trace_file=self.trace_file)
1673            threads.append(t)
1674            t.start()
1675
1676        # Wait until all finish
1677        thread_pool.wait_for_all_done()
1678
1679        # If none failed, start the master
1680        failed = [ t.getName() for t in threads if not t.rv ]
1681
1682        if len(failed) == 0:
1683            starter = self.start_segment(log=log, 
1684                    keyfile=self.ssh_privkey_file, debug=self.debug)
1685            if not starter(master, eid, tbparams, tmpdir):
1686                failed.append(master)
1687
1688        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1689        # If one failed clean up, unless fail_soft is set
1690        if failed:
1691            if not fail_soft:
1692                thread_pool.clear()
1693                for tb in succeeded:
1694                    # Create and start a thread to stop the segment
1695                    thread_pool.wait_for_slot()
1696                    t  = self.pooled_thread(\
1697                            target=self.stop_segment(log=log,
1698                                keyfile=self.ssh_privkey_file,
1699                                debug=self.debug), 
1700                            args=(tb, eid, tbparams), name=tb,
1701                            pdata=thread_pool, trace_file=self.trace_file)
1702                    t.start()
1703                # Wait until all finish
1704                thread_pool.wait_for_all_done()
1705
1706                # release the allocations
1707                for tb in tbparams.keys():
1708                    self.release_access(tb, tbparams[tb]['allocID'])
1709                # Remove the placeholder
1710                self.state_lock.acquire()
1711                self.state[eid]['experimentStatus'] = 'failed'
1712                if self.state_filename: self.write_state()
1713                self.state_lock.release()
1714
1715                #raise service_error(service_error.federant,
1716                #    "Swap in failed on %s" % ",".join(failed))
1717                log.error("Swap in failed on %s" % ",".join(failed))
1718                return
1719        else:
1720            log.info("[start_segment]: Experiment %s active" % eid)
1721
1722        log.debug("[start_experiment]: removing %s" % tmpdir)
1723
1724        # Walk up tmpdir, deleting as we go
1725        for path, dirs, files in os.walk(tmpdir, topdown=False):
1726            for f in files:
1727                os.remove(os.path.join(path, f))
1728            for d in dirs:
1729                os.rmdir(os.path.join(path, d))
1730        os.rmdir(tmpdir)
1731
1732        # Insert the experiment into our state and update the disk copy
1733        self.state_lock.acquire()
1734        self.state[expid]['experimentStatus'] = 'active'
1735        self.state[eid] = self.state[expid]
1736        if self.state_filename: self.write_state()
1737        self.state_lock.release()
1738        return
1739
1740    def create_experiment(self, req, fid):
1741        """
1742        The external interface to experiment creation called from the
1743        dispatcher.
1744
1745        Creates a working directory, splits the incoming description using the
1746        splitter script and parses out the avrious subsections using the
1747        lcasses above.  Once each sub-experiment is created, use pooled threads
1748        to instantiate them and start it all up.
1749        """
1750
1751        if not self.auth.check_attribute(fid, 'create'):
1752            raise service_error(service_error.access, "Create access denied")
1753
1754        try:
1755            tmpdir = tempfile.mkdtemp(prefix="split-")
1756        except IOError:
1757            raise service_error(service_error.internal, "Cannot create tmp dir")
1758
1759        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1760        gw_secretkey_base = "fed.%s" % self.ssh_type
1761        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1762        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1763        tclfile = tmpdir + "/experiment.tcl"
1764        tbparams = { }
1765        try:
1766            access_user = self.accessdb[fid]
1767        except KeyError:
1768            raise service_error(service_error.internal,
1769                    "Access map and authorizer out of sync in " + \
1770                            "create_experiment for fedid %s"  % fid)
1771
1772        pid = "dummy"
1773        gid = "dummy"
1774        try:
1775            os.mkdir(tmpdir+"/keys")
1776        except OSError:
1777            raise service_error(service_error.internal,
1778                    "Can't make temporary dir")
1779
1780        req = req.get('CreateRequestBody', None)
1781        if not req:
1782            raise service_error(service_error.req,
1783                    "Bad request format (no CreateRequestBody)")
1784        # The tcl parser needs to read a file so put the content into that file
1785        descr=req.get('experimentdescription', None)
1786        if descr:
1787            file_content=descr.get('ns2description', None)
1788            if file_content:
1789                try:
1790                    f = open(tclfile, 'w')
1791                    f.write(file_content)
1792                    f.close()
1793                except IOError:
1794                    raise service_error(service_error.internal,
1795                            "Cannot write temp experiment description")
1796            else:
1797                raise service_error(service_error.req, 
1798                        "Only ns2descriptions supported")
1799        else:
1800            raise service_error(service_error.req, "No experiment description")
1801
1802        # Generate an ID for the experiment (slice) and a certificate that the
1803        # allocator can use to prove they own it.  We'll ship it back through
1804        # the encrypted connection.
1805        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1806
1807        if req.has_key('experimentID') and \
1808                req['experimentID'].has_key('localname'):
1809            eid = req['experimentID']['localname']
1810            self.state_lock.acquire()
1811            while (self.state.has_key(eid)):
1812                eid += random.choice(string.ascii_letters)
1813            # Initial state
1814            self.state[eid] = {
1815                    'experimentID' : \
1816                            [ { 'localname' : eid }, {'fedid': expid } ],
1817                    'experimentStatus': 'starting',
1818                    'experimentAccess': { 'X509' : expcert },
1819                    'owner': fid,
1820                    'log' : [],
1821                }
1822            self.state[expid] = self.state[eid]
1823            if self.state_filename: self.write_state()
1824            self.state_lock.release()
1825        else:
1826            eid = self.exp_stem
1827            for i in range(0,5):
1828                eid += random.choice(string.ascii_letters)
1829            self.state_lock.acquire()
1830            while (self.state.has_key(eid)):
1831                eid = self.exp_stem
1832                for i in range(0,5):
1833                    eid += random.choice(string.ascii_letters)
1834            # Initial state
1835            self.state[eid] = {
1836                    'experimentID' : \
1837                            [ { 'localname' : eid }, {'fedid': expid } ],
1838                    'experimentStatus': 'starting',
1839                    'experimentAccess': { 'X509' : expcert },
1840                    'owner': fid,
1841                    'log' : [],
1842                }
1843            self.state[expid] = self.state[eid]
1844            if self.state_filename: self.write_state()
1845            self.state_lock.release()
1846
1847        try: 
1848            # This catches exceptions to clear the placeholder if necessary
1849            try:
1850                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1851            except ValueError:
1852                raise service_error(service_error.server_config, 
1853                        "Bad key type (%s)" % self.ssh_type)
1854
1855            user = req.get('user', None)
1856            if user == None:
1857                raise service_error(service_error.req, "No user")
1858
1859            master = req.get('master', None)
1860            if not master:
1861                raise service_error(service_error.req,
1862                        "No master testbed label")
1863            export_project = req.get('exportProject', None)
1864            if not export_project:
1865                raise service_error(service_error.req, "No export project")
1866           
1867            if self.splitter_url:
1868                self.log.debug("Calling remote splitter at %s" % \
1869                        self.splitter_url)
1870                split_data = self.remote_splitter(self.splitter_url,
1871                        file_content, master)
1872            else:
1873                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1874                    str(self.muxmax), '-m', master]
1875
1876                if self.fedkit:
1877                    tclcmd.append('-k')
1878
1879                if self.gatewaykit:
1880                    tclcmd.append('-K')
1881
1882                tclcmd.extend([pid, gid, eid, tclfile])
1883
1884                self.log.debug("running local splitter %s", " ".join(tclcmd))
1885                tclparser = Popen(tclcmd, stdout=PIPE)
1886                split_data = tclparser.stdout
1887
1888            allocated = { }         # Testbeds we can access
1889            # Objects to parse the splitter output (defined above)
1890            parse_current_testbed = self.current_testbed(eid, tmpdir,
1891                    self.fedkit, self.gatewaykit)
1892            parse_allbeds = self.allbeds(self.get_access)
1893            parse_gateways = self.gateways(eid, master, tmpdir,
1894                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1895                    self.fedkit)
1896            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1897                        "^#\s+End\s+Vtopo")
1898            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1899                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1900            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1901                    "^#\s+End\s+tarfiles")
1902            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1903                    "^#\s+End\s+rpms")
1904
1905            # Working on the split data
1906            for line in split_data:
1907                line = line.rstrip()
1908                if parse_current_testbed(line, master, allocated, tbparams):
1909                    continue
1910                elif parse_allbeds(line, user, tbparams, master, export_project,
1911                        access_user):
1912                    continue
1913                elif parse_gateways(line, allocated, tbparams):
1914                    continue
1915                elif parse_vtopo(line):
1916                    continue
1917                elif parse_hostnames(line):
1918                    continue
1919                elif parse_tarfiles(line):
1920                    continue
1921                elif parse_rpms(line):
1922                    continue
1923                else:
1924                    raise service_error(service_error.internal, 
1925                            "Bad tcl parse? %s" % line)
1926            # Virtual topology and visualization
1927            vtopo = self.gentopo(parse_vtopo.str)
1928            if not vtopo:
1929                raise service_error(service_error.internal, 
1930                        "Failed to generate virtual topology")
1931
1932            vis = self.genviz(vtopo)
1933            if not vis:
1934                raise service_error(service_error.internal, 
1935                        "Failed to generate visualization")
1936
1937           
1938            # save federant information
1939            for k in allocated.keys():
1940                tbparams[k]['federant'] = {\
1941                        'name': [ { 'localname' : eid} ],\
1942                        'emulab': tbparams[k]['emulab'],\
1943                        'allocID' : tbparams[k]['allocID'],\
1944                        'master' : k == master,\
1945                    }
1946
1947            self.state_lock.acquire()
1948            self.state[eid]['vtopo'] = vtopo
1949            self.state[eid]['vis'] = vis
1950            self.state[expid]['federant'] = \
1951                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1952                        if tbparams[tb].has_key('federant') ]
1953            if self.state_filename: self.write_state()
1954            self.state_lock.release()
1955
1956            # Copy tarfiles and rpms needed at remote sites into a staging area
1957            try:
1958                if self.fedkit:
1959                    for t in self.fedkit:
1960                        parse_tarfiles.list.append(t[1])
1961                if self.gatewaykit:
1962                    for t in self.gatewaykit:
1963                        parse_tarfiles.list.append(t[1])
1964                for t in parse_tarfiles.list:
1965                    if not os.path.exists("%s/tarfiles" % tmpdir):
1966                        os.mkdir("%s/tarfiles" % tmpdir)
1967                    self.copy_file(t, "%s/tarfiles/%s" % \
1968                            (tmpdir, os.path.basename(t)))
1969                for r in parse_rpms.list:
1970                    if not os.path.exists("%s/rpms" % tmpdir):
1971                        os.mkdir("%s/rpms" % tmpdir)
1972                    self.copy_file(r, "%s/rpms/%s" % \
1973                            (tmpdir, os.path.basename(r)))
1974                # A null experiment file in case we need to create a remote
1975                # experiment from scratch
1976                f = open("%s/null.tcl" % tmpdir, "w")
1977                print >>f, """
1978set ns [new Simulator]
1979source tb_compat.tcl
1980
1981set a [$ns node]
1982
1983$ns rtproto Session
1984$ns run
1985"""
1986                f.close()
1987
1988            except IOError, e:
1989                raise service_error(service_error.internal, 
1990                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1991
1992        except service_error, e:
1993            # If something goes wrong in the parse (usually an access error)
1994            # clear the placeholder state.  From here on out the code delays
1995            # exceptions.  Failing at this point returns a fault to the remote
1996            # caller.
1997            self.state_lock.acquire()
1998            del self.state[eid]
1999            del self.state[expid]
2000            if self.state_filename: self.write_state()
2001            self.state_lock.release()
2002            raise e
2003
2004
2005        # Start the background swapper and return the starting state.  From
2006        # here on out, the state will stick around a while.
2007
2008        # Let users touch the state
2009        self.auth.set_attribute(fid, expid)
2010        self.auth.set_attribute(expid, expid)
2011
2012        # Create a logger that logs to the experiment's state object as well as
2013        # to the main log file.
2014
2015        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2016        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2017        # XXX: there should be a global one of these rather than repeating the
2018        # code.
2019        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2020                    '%d %b %y %H:%M:%S'))
2021        alloc_log.addHandler(h)
2022       
2023
2024
2025
2026
2027        # Start a thread to do the resource allocation
2028        t  = Thread(target=self.allocate_resources,
2029                args=(allocated, master, eid, expid, expcert, tbparams, 
2030                    tmpdir, alloc_log),
2031                name=eid)
2032        t.start()
2033
2034        rv = {
2035                'experimentID': [
2036                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2037                ],
2038                'experimentStatus': 'starting',
2039                'experimentAccess': { 'X509' : expcert }
2040            }
2041
2042        return rv
2043
2044    def check_experiment_access(self, fid, key):
2045        """
2046        Confirm that the fid has access to the experiment.  Though a request
2047        may be made in terms of a local name, the access attribute is always
2048        the experiment's fedid.
2049        """
2050        if not isinstance(key, fedid):
2051            self.state_lock.acquire()
2052            if self.state.has_key(key):
2053                if isinstance(self.state[key], dict):
2054                    try:
2055                        kl = [ f['fedid'] for f in \
2056                                self.state[key]['experimentID']\
2057                                    if f.has_key('fedid') ]
2058                    except KeyError:
2059                        self.state_lock.release()
2060                        raise service_error(service_error.internal, 
2061                                "No fedid for experiment %s when checking " +\
2062                                        "access(!?)" % key)
2063                    if len(kl) == 1:
2064                        key = kl[0]
2065                    else:
2066                        self.state_lock.release()
2067                        raise service_error(service_error.internal, 
2068                                "multiple fedids for experiment %s when " +\
2069                                        "checking access(!?)" % key)
2070                elif isinstance(self.state[key], str):
2071                    self.state_lock.release()
2072                    raise service_error(service_error.internal, 
2073                            ("experiment %s is placeholder.  " +\
2074                                    "Creation in progress or aborted oddly") \
2075                                    % key)
2076                else:
2077                    self.state_lock.release()
2078                    raise service_error(service_error.internal, 
2079                            "Unexpected state for %s" % key)
2080
2081            else:
2082                self.state_lock.release()
2083                raise service_error(service_error.access, "Access Denied")
2084            self.state_lock.release()
2085
2086        if self.auth.check_attribute(fid, key):
2087            return True
2088        else:
2089            raise service_error(service_error.access, "Access Denied")
2090
2091
2092
2093    def get_vtopo(self, req, fid):
2094        """
2095        Return the stored virtual topology for this experiment
2096        """
2097        rv = None
2098        state = None
2099
2100        req = req.get('VtopoRequestBody', None)
2101        if not req:
2102            raise service_error(service_error.req,
2103                    "Bad request format (no VtopoRequestBody)")
2104        exp = req.get('experiment', None)
2105        if exp:
2106            if exp.has_key('fedid'):
2107                key = exp['fedid']
2108                keytype = "fedid"
2109            elif exp.has_key('localname'):
2110                key = exp['localname']
2111                keytype = "localname"
2112            else:
2113                raise service_error(service_error.req, "Unknown lookup type")
2114        else:
2115            raise service_error(service_error.req, "No request?")
2116
2117        self.check_experiment_access(fid, key)
2118
2119        self.state_lock.acquire()
2120        if self.state.has_key(key):
2121            if self.state[key].has_key('vtopo'):
2122                rv = { 'experiment' : {keytype: key },\
2123                        'vtopo': self.state[key]['vtopo'],\
2124                    }
2125            else:
2126                state = self.state[key]['experimentStatus']
2127        self.state_lock.release()
2128
2129        if rv: return rv
2130        else: 
2131            if state:
2132                raise service_error(service_error.partial, 
2133                        "Not ready: %s" % state)
2134            else:
2135                raise service_error(service_error.req, "No such experiment")
2136
2137    def get_vis(self, req, fid):
2138        """
2139        Return the stored visualization for this experiment
2140        """
2141        rv = None
2142        state = None
2143
2144        req = req.get('VisRequestBody', None)
2145        if not req:
2146            raise service_error(service_error.req,
2147                    "Bad request format (no VisRequestBody)")
2148        exp = req.get('experiment', None)
2149        if exp:
2150            if exp.has_key('fedid'):
2151                key = exp['fedid']
2152                keytype = "fedid"
2153            elif exp.has_key('localname'):
2154                key = exp['localname']
2155                keytype = "localname"
2156            else:
2157                raise service_error(service_error.req, "Unknown lookup type")
2158        else:
2159            raise service_error(service_error.req, "No request?")
2160
2161        self.check_experiment_access(fid, key)
2162
2163        self.state_lock.acquire()
2164        if self.state.has_key(key):
2165            if self.state[key].has_key('vis'):
2166                rv =  { 'experiment' : {keytype: key },\
2167                        'vis': self.state[key]['vis'],\
2168                        }
2169            else:
2170                state = self.state[key]['experimentStatus']
2171        self.state_lock.release()
2172
2173        if rv: return rv
2174        else:
2175            if state:
2176                raise service_error(service_error.partial, 
2177                        "Not ready: %s" % state)
2178            else:
2179                raise service_error(service_error.req, "No such experiment")
2180
2181    def clean_info_response(self, rv):
2182        """
2183        Remove the information in the experiment's state object that is not in
2184        the info response.
2185        """
2186        # Remove the owner info (should always be there, but...)
2187        if rv.has_key('owner'): del rv['owner']
2188
2189        # Convert the log into the allocationLog parameter and remove the
2190        # log entry (with defensive programming)
2191        if rv.has_key('log'):
2192            rv['allocationLog'] = "".join(rv['log'])
2193            del rv['log']
2194        else:
2195            rv['allocationLog'] = ""
2196
2197        if rv['experimentStatus'] != 'active':
2198            if rv.has_key('federant'): del rv['federant']
2199        else:
2200            # remove the allocationID info from each federant
2201            for f in rv.get('federant', []):
2202                if f.has_key('allocID'): del f['allocID']
2203
2204        return rv
2205
2206    def get_info(self, req, fid):
2207        """
2208        Return all the stored info about this experiment
2209        """
2210        rv = None
2211
2212        req = req.get('InfoRequestBody', None)
2213        if not req:
2214            raise service_error(service_error.req,
2215                    "Bad request format (no InfoRequestBody)")
2216        exp = req.get('experiment', None)
2217        if exp:
2218            if exp.has_key('fedid'):
2219                key = exp['fedid']
2220                keytype = "fedid"
2221            elif exp.has_key('localname'):
2222                key = exp['localname']
2223                keytype = "localname"
2224            else:
2225                raise service_error(service_error.req, "Unknown lookup type")
2226        else:
2227            raise service_error(service_error.req, "No request?")
2228
2229        self.check_experiment_access(fid, key)
2230
2231        # The state may be massaged by the service function that called
2232        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2233        # state.
2234        self.state_lock.acquire()
2235        if self.state.has_key(key):
2236            rv = copy.deepcopy(self.state[key])
2237        self.state_lock.release()
2238
2239        if rv:
2240            return self.clean_info_response(rv)
2241        else:
2242            raise service_error(service_error.req, "No such experiment")
2243
2244    def get_multi_info(self, req, fid):
2245        """
2246        Return all the stored info that this fedid can access
2247        """
2248        rv = { 'info': [ ] }
2249
2250        self.state_lock.acquire()
2251        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2252            self.check_experiment_access(fid, key)
2253
2254            if self.state.has_key(key):
2255                e = copy.deepcopy(self.state[key])
2256                e = self.clean_info_response(e)
2257                rv['info'].append(e)
2258        self.state_lock.release()
2259        return rv
2260
2261
2262    def terminate_experiment(self, req, fid):
2263        """
2264        Swap this experiment out on the federants and delete the shared
2265        information
2266        """
2267        tbparams = { }
2268        req = req.get('TerminateRequestBody', None)
2269        if not req:
2270            raise service_error(service_error.req,
2271                    "Bad request format (no TerminateRequestBody)")
2272        exp = req.get('experiment', None)
2273        if exp:
2274            if exp.has_key('fedid'):
2275                key = exp['fedid']
2276                keytype = "fedid"
2277            elif exp.has_key('localname'):
2278                key = exp['localname']
2279                keytype = "localname"
2280            else:
2281                raise service_error(service_error.req, "Unknown lookup type")
2282        else:
2283            raise service_error(service_error.req, "No request?")
2284
2285        self.check_experiment_access(fid, key)
2286
2287        self.state_lock.acquire()
2288        fed_exp = self.state.get(key, None)
2289
2290        if fed_exp:
2291            # This branch of the conditional holds the lock to generate a
2292            # consistent temporary tbparams variable to deallocate experiments.
2293            # It releases the lock to do the deallocations and reacquires it to
2294            # remove the experiment state when the termination is complete.
2295
2296            # First make sure that the experiment creation is complete.
2297            status = fed_exp.get('experimentStatus', None)
2298            if status:
2299                if status == 'starting':
2300                    self.state_lock.release()
2301                    raise service_error(service_error.partial, 
2302                            'Experiment still being created')
2303            else:
2304                # No status??? trouble
2305                self.state_lock.release()
2306                raise service_error(service_error.internal,
2307                        "Experiment has no status!?")
2308
2309            ids = []
2310            #  experimentID is a list of dicts that are self-describing
2311            #  identifiers.  This finds all the fedids and localnames - the
2312            #  keys of self.state - and puts them into ids.
2313            for id in fed_exp.get('experimentID', []):
2314                if id.has_key('fedid'): ids.append(id['fedid'])
2315                if id.has_key('localname'): ids.append(id['localname'])
2316
2317            # Construct enough of the tbparams to make the stop_segment calls
2318            # work
2319            for fed in fed_exp.get('federant', []):
2320                try:
2321                    for e in fed['name']:
2322                        eid = e.get('localname', None)
2323                        if eid: break
2324                    else:
2325                        continue
2326
2327                    p = fed['emulab']['project']
2328
2329                    project = p['name']['localname']
2330                    tb = p['testbed']['localname']
2331                    user = p['user'][0]['userID']['localname']
2332
2333                    domain = fed['emulab']['domain']
2334                    host  = fed['emulab']['ops']
2335                    aid = fed['allocID']
2336                except KeyError, e:
2337                    continue
2338                tbparams[tb] = {\
2339                        'user': user,\
2340                        'domain': domain,\
2341                        'project': project,\
2342                        'host': host,\
2343                        'eid': eid,\
2344                        'aid': aid,\
2345                    }
2346            self.state_lock.release()
2347
2348            # Stop everyone.
2349            thread_pool = self.thread_pool(self.nthreads)
2350            for tb in tbparams.keys():
2351                # Create and start a thread to stop the segment
2352                thread_pool.wait_for_slot()
2353                t  = self.pooled_thread(\
2354                        target=self.stop_segment(log=self.log,
2355                            keyfile=self.ssh_privkey_file, debug=self.debug), 
2356                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2357                        pdata=thread_pool, trace_file=self.trace_file)
2358                t.start()
2359            # Wait for completions
2360            thread_pool.wait_for_all_done()
2361
2362            # release the allocations (failed experiments have done this
2363            # already)
2364            if status != 'failed':
2365                for tb in tbparams.keys():
2366                    self.release_access(tb, tbparams[tb]['aid'])
2367
2368            # Remove the terminated experiment
2369            self.state_lock.acquire()
2370            for id in ids:
2371                if self.state.has_key(id): del self.state[id]
2372
2373            if self.state_filename: self.write_state()
2374            self.state_lock.release()
2375
2376            return { 'experiment': exp }
2377        else:
2378            # Don't forget to release the lock
2379            self.state_lock.release()
2380            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.