source: fedd/federation/experiment_control.py @ 3cbf9eb

version-1.30
Last change on this file since 3cbf9eb was 3cbf9eb, checked in by Ted Faber <faber@…>, 15 years ago

Adjust search order to check the expoerted project first.

  • Property mode set to 100644
File size: 91.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = RLock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
70   
71    class thread_pool:
72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
148
149    class pooled_thread(Thread):
150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
188
189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
192
193    def __init__(self, config=None, auth=None):
194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
214        self.list_log = experiment_control_local.list_log
215
216        self.cert_file = config.get("experiment_control", "cert_file")
217        if self.cert_file:
218            self.cert_pwd = config.get("experiment_control", "cert_pwd")
219        else:
220            self.cert_file = config.get("globals", "cert_file")
221            self.cert_pwd = config.get("globals", "cert_pwd")
222
223        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
224                or config.get("globals", "trusted_certs")
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 2
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = RLock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'Create': soap_handler('Create', self.create_experiment),
337                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
338                'Vis': soap_handler('Vis', self.get_vis),
339                'Info': soap_handler('Info', self.get_info),
340                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
341                'Terminate': soap_handler('Terminate', 
342                    self.terminate_experiment),
343        }
344
345        self.xmlrpc_services = {\
346                'Create': xmlrpc_handler('Create', self.create_experiment),
347                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
348                'Vis': xmlrpc_handler('Vis', self.get_vis),
349                'Info': xmlrpc_handler('Info', self.get_info),
350                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
351                'Terminate': xmlrpc_handler('Terminate',
352                    self.terminate_experiment),
353        }
354
355    def copy_file(self, src, dest, size=1024):
356        """
357        Exceedingly simple file copy.
358        """
359        s = open(src,'r')
360        d = open(dest, 'w')
361
362        buf = "x"
363        while buf != "":
364            buf = s.read(size)
365            d.write(buf)
366        s.close()
367        d.close()
368
369    # Call while holding self.state_lock
370    def write_state(self):
371        """
372        Write a new copy of experiment state after copying the existing state
373        to a backup.
374
375        State format is a simple pickling of the state dictionary.
376        """
377        if os.access(self.state_filename, os.W_OK):
378            self.copy_file(self.state_filename, \
379                    "%s.bak" % self.state_filename)
380        try:
381            f = open(self.state_filename, 'w')
382            pickle.dump(self.state, f)
383        except IOError, e:
384            self.log.error("Can't write file %s: %s" % \
385                    (self.state_filename, e))
386        except pickle.PicklingError, e:
387            self.log.error("Pickling problem: %s" % e)
388        except TypeError, e:
389            self.log.error("Pickling problem (TypeError): %s" % e)
390
391    # Call while holding self.state_lock
392    def read_state(self):
393        """
394        Read a new copy of experiment state.  Old state is overwritten.
395
396        State format is a simple pickling of the state dictionary.
397        """
398        try:
399            f = open(self.state_filename, "r")
400            self.state = pickle.load(f)
401            self.log.debug("[read_state]: Read state from %s" % \
402                    self.state_filename)
403        except IOError, e:
404            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
405                    % (self.state_filename, e))
406        except pickle.UnpicklingError, e:
407            self.log.warning(("[read_state]: No saved state: " + \
408                    "Unpickling failed: %s") % e)
409       
410        for k in self.state.keys():
411            try:
412                # This list should only have one element in it, but phrasing it
413                # as a for loop doesn't cost much, really.  We have to find the
414                # fedid elements anyway.
415                for eid in [ f['fedid'] \
416                        for f in self.state[k]['experimentID']\
417                            if f.has_key('fedid') ]:
418                    self.auth.set_attribute(self.state[k]['owner'], eid)
419                    # allow overrides to control experiments as well
420                    for o in self.overrides:
421                        self.auth.set_attribute(o, eid)
422            except KeyError, e:
423                self.log.warning("[read_state]: State ownership or identity " +\
424                        "misformatted in %s: %s" % (self.state_filename, e))
425
426
427    def read_accessdb(self, accessdb_file):
428        """
429        Read the mapping from fedids that can create experiments to their name
430        in the 3-level access namespace.  All will be asserted from this
431        testbed and can include the local username and porject that will be
432        asserted on their behalf by this fedd.  Each fedid is also added to the
433        authorization system with the "create" attribute.
434        """
435        self.accessdb = {}
436        # These are the regexps for parsing the db
437        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
438        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
439                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
440        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
441                "\s*->\s*(" + name_expr + ")\s*$")
442        lineno = 0
443
444        # Parse the mappings and store in self.authdb, a dict of
445        # fedid -> (proj, user)
446        try:
447            f = open(accessdb_file, "r")
448            for line in f:
449                lineno += 1
450                line = line.strip()
451                if len(line) == 0 or line.startswith('#'):
452                    continue
453                m = project_line.match(line)
454                if m:
455                    fid = fedid(hexstr=m.group(1))
456                    project, user = m.group(2,3)
457                    if not self.accessdb.has_key(fid):
458                        self.accessdb[fid] = []
459                    self.accessdb[fid].append((project, user))
460                    continue
461
462                m = user_line.match(line)
463                if m:
464                    fid = fedid(hexstr=m.group(1))
465                    project = None
466                    user = m.group(2)
467                    if not self.accessdb.has_key(fid):
468                        self.accessdb[fid] = []
469                    self.accessdb[fid].append((project, user))
470                    continue
471                self.log.warn("[experiment_control] Error parsing access " +\
472                        "db %s at line %d" %  (accessdb_file, lineno))
473        except IOError:
474            raise service_error(service_error.internal,
475                    "Error opening/reading %s as experiment " +\
476                            "control accessdb" %  accessdb_file)
477        f.close()
478
479        # Initialize the authorization attributes
480        for fid in self.accessdb.keys():
481            self.auth.set_attribute(fid, 'create')
482
483    def read_mapdb(self, file):
484        """
485        Read a simple colon separated list of mappings for the
486        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
487        """
488
489        self.tbmap = { }
490        lineno =0
491        try:
492            f = open(file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if line.startswith('#') or len(line) == 0:
497                    continue
498                try:
499                    label, url = line.split(':', 1)
500                    self.tbmap[label] = url
501                except ValueError, e:
502                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
503                            "map db: %s %s" % (lineno, line, e))
504        except IOError, e:
505            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
506                    "open %s: %s" % (file, e))
507        f.close()
508
509    class emulab_segment:
510        def __init__(self, log=None, keyfile=None, debug=False):
511            self.log = log or logging.getLogger(\
512                    'fedd.experiment_control.emulab_segment')
513            self.ssh_privkey_file = keyfile
514            self.debug = debug
515            self.ssh_exec="/usr/bin/ssh"
516            self.scp_exec = "/usr/bin/scp"
517            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
518
519        def scp_file(self, file, user, host, dest=""):
520            """
521            scp a file to the remote host.  If debug is set the action is only
522            logged.
523            """
524
525            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
526                    '-o', 'StrictHostKeyChecking yes', '-i', 
527                    self.ssh_privkey_file, file, 
528                    "%s@%s:%s" % (user, host, dest)]
529            rv = 0
530
531            try:
532                dnull = open("/dev/null", "w")
533            except IOError:
534                self.log.debug("[ssh_file]: failed to open " + \
535                        "/dev/null for redirect")
536                dnull = Null
537
538            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
539            if not self.debug:
540                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
541                        close_fds=True)
542
543            return rv == 0
544
545        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
546            """
547            Run a remote command on host as user.  If debug is set, the action
548            is only logged.  Commands are run without stdin, to avoid stray
549            SIGTTINs.
550            """
551            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
552                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
553                    (self.ssh_exec, self.ssh_privkey_file, 
554                            user, host, cmd)
555
556            try:
557                dnull = open("/dev/null", "w")
558            except IOError:
559                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
560                        "for redirect")
561                dnull = Null
562
563            self.log.debug("[ssh_cmd]: %s" % sh_str)
564            if not self.debug:
565                if dnull:
566                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
567                            close_fds=True)
568                else:
569                    sub = Popen(sh_str, shell=True,
570                            close_fds=True)
571                if timeout:
572                    i = 0
573                    rv = sub.poll()
574                    while i < timeout:
575                        if rv is not None: break
576                        else:
577                            time.sleep(1)
578                            rv = sub.poll()
579                            i += 1
580                    else:
581                        self.log.debug("Process exceeded runtime: %s" % sh_str)
582                        os.kill(sub.pid, signal.SIGKILL)
583                        raise self.ssh_cmd_timeout();
584                    return rv == 0
585                else:
586                    return sub.wait() == 0
587            else:
588                if timeout == 0:
589                    self.log.debug("debug timeout raised on %s " % sh_str)
590                    raise self.ssh_cmd_timeout()
591                else:
592                    return True
593
594    class start_segment(emulab_segment):
595        def __init__(self, log=None, keyfile=None, debug=False):
596            experiment_control_local.emulab_segment.__init__(self,
597                    log=log, keyfile=keyfile, debug=debug)
598
599        def create_config_tree(self, src_dir, dest_dir, script):
600            """
601            Append commands to script that will create the directory hierarchy
602            on the remote federant.
603            """
604
605            if os.path.isdir(src_dir):
606                print >>script, "mkdir -p %s" % dest_dir
607                print >>script, "chmod 770 %s" % dest_dir
608
609                for f in os.listdir(src_dir):
610                    if os.path.isdir(f):
611                        self.create_config_tree("%s/%s" % (src_dir, f), 
612                                "%s/%s" % (dest_dir, f), script)
613            else:
614                self.log.debug("[create_config_tree]: Not a directory: %s" \
615                        % src_dir)
616
617        def ship_configs(self, host, user, src_dir, dest_dir):
618            """
619            Copy federant-specific configuration files to the federant.
620            """
621            for f in os.listdir(src_dir):
622                if os.path.isdir(f):
623                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
624                            "%s/%s" % (dest_dir, f)):
625                        return False
626                else:
627                    if not self.scp_file("%s/%s" % (src_dir, f), 
628                            user, host, dest_dir):
629                        return False
630            return True
631
632        def get_state(self, user, host, tb, pid, eid):
633            # command to test experiment state
634            expinfo_exec = "/usr/testbed/bin/expinfo" 
635            # Regular expressions to parse the expinfo response
636            state_re = re.compile("State:\s+(\w+)")
637            no_exp_re = re.compile("^No\s+such\s+experiment")
638            swapping_re = re.compile("^No\s+information\s+available.")
639            state = None    # Experiment state parsed from expinfo
640            # The expinfo ssh command.  Note the identity restriction to use
641            # only the identity provided in the pubkey given.
642            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
643                    'StrictHostKeyChecking yes', '-i', 
644                    self.ssh_privkey_file, "%s@%s" % (user, host), 
645                    expinfo_exec, pid, eid]
646
647            dev_null = None
648            try:
649                dev_null = open("/dev/null", "a")
650            except IOError, e:
651                self.log.error("[get_state]: can't open /dev/null: %s" %e)
652
653            if self.debug:
654                state = 'swapped'
655                rv = 0
656            else:
657                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
658                        close_fds=True)
659                for line in status.stdout:
660                    m = state_re.match(line)
661                    if m: state = m.group(1)
662                    else:
663                        for reg, st in ((no_exp_re, "none"),
664                                (swapping_re, "swapping")):
665                            m = reg.match(line)
666                            if m: state = st
667                rv = status.wait()
668
669            # If the experiment is not present the subcommand returns a
670            # non-zero return value.  If we successfully parsed a "none"
671            # outcome, ignore the return code.
672            if rv != 0 and state != 'none':
673                raise service_error(service_error.internal,
674                        "Cannot get status of segment %s:%s/%s" % \
675                                (tb, pid, eid))
676            elif state not in ('active', 'swapped', 'swapping', 'none'):
677                raise service_error(service_error.internal,
678                        "Cannot get status of segment %s:%s/%s" % \
679                                (tb, pid, eid))
680            else: return state
681
682
683        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
684            """
685            Start a sub-experiment on a federant.
686
687            Get the current state, modify or create as appropriate, ship data
688            and configs and start the experiment.  There are small ordering
689            differences based on the initial state of the sub-experiment.
690            """
691            # ops node in the federant
692            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
693            user = tbparams[tb]['user']     # federant user
694            pid = tbparams[tb]['project']   # federant project
695            # XXX
696            base_confs = ( "hosts",)
697            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
698            # Configuration directories on the remote machine
699            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
700            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
701            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
702
703            state = self.get_state(user, host, tb, pid, eid)
704
705            self.log.debug("[start_segment]: %s: %s" % (tb, state))
706            self.log.info("[start_segment]:transferring experiment to %s" % tb)
707
708            if not self.scp_file("%s/%s/%s" % \
709                    (tmpdir, tb, tclfile), user, host):
710                return False
711           
712            if state == 'none':
713                # Create a null copy of the experiment so that we capture any
714                # logs there if the modify fails.  Emulab software discards the
715                # logs from a failed startexp
716                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
717                    return False
718                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
719                timedout = False
720                try:
721                    if not self.ssh_cmd(user, host,
722                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
723                            "-e %s null.tcl") % (pid, eid), "startexp",
724                            timeout=60 * 10):
725                        return False
726                except self.ssh_cmd_timeout:
727                    timedout = True
728
729                if timedout:
730                    state = self.get_state(user, host, tb, pid, eid)
731                    if state != "swapped":
732                        return False
733
734           
735            # Open up a temporary file to contain a script for setting up the
736            # filespace for the new experiment.
737            self.log.info("[start_segment]: creating script file")
738            try:
739                sf, scriptname = tempfile.mkstemp()
740                scriptfile = os.fdopen(sf, 'w')
741            except IOError:
742                return False
743
744            scriptbase = os.path.basename(scriptname)
745
746            # Script the filesystem changes
747            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
748            # Clear and create the tarfiles and rpm directories
749            for d in (tarfiles_dir, rpms_dir):
750                print >>scriptfile, "/bin/rm -rf %s/*" % d
751                print >>scriptfile, "mkdir -p %s" % d
752            print >>scriptfile, 'mkdir -p %s' % proj_dir
753            self.create_config_tree("%s/%s" % (tmpdir, tb),
754                    proj_dir, scriptfile)
755            if os.path.isdir("%s/tarfiles" % tmpdir):
756                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
757                        scriptfile)
758            if os.path.isdir("%s/rpms" % tmpdir):
759                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
760                        scriptfile)
761            print >>scriptfile, "rm -f %s" % scriptbase
762            scriptfile.close()
763
764            # Move the script to the remote machine
765            # XXX: could collide tempfile names on the remote host
766            if self.scp_file(scriptname, user, host, scriptbase):
767                os.remove(scriptname)
768            else:
769                return False
770
771            # Execute the script (and the script's last line deletes it)
772            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
773                return False
774
775            for f in base_confs:
776                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
777                        "%s/%s" % (proj_dir, f)):
778                    return False
779            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
780                    proj_dir):
781                return False
782            if os.path.isdir("%s/tarfiles" % tmpdir):
783                if not self.ship_configs(host, user,
784                        "%s/tarfiles" % tmpdir, tarfiles_dir):
785                    return False
786            if os.path.isdir("%s/rpms" % tmpdir):
787                if not self.ship_configs(host, user,
788                        "%s/rpms" % tmpdir, tarfiles_dir):
789                    return False
790            # Stage the new configuration (active experiments will stay swapped
791            # in now)
792            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
793            try:
794                if not self.ssh_cmd(user, host,
795                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
796                                (pid, eid, tclfile),
797                        "modexp", timeout= 60 * 10):
798                    return False
799            except self.ssh_cmd_timeout:
800                self.log.error("Modify command failed to complete in time")
801                # There's really no way to see if this succeeded or failed, so
802                # if it hangs, assume the worst.
803                return False
804            # Active experiments are still swapped, this swaps the others in.
805            if state != 'active':
806                self.log.info("[start_segment]: Swapping %s in on %s" % \
807                        (eid, tb))
808                timedout = False
809                try:
810                    if not self.ssh_cmd(user, host,
811                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
812                            "swapexp", timeout=10*60):
813                        return False
814                except self.ssh_cmd_timeout:
815                    timedout = True
816               
817                # If the command was terminated, but completed successfully,
818                # report success.
819                if timedout:
820                    self.log.debug("[start_segment]: swapin timed out " +\
821                            "checking state")
822                    state = self.get_state(user, host, tb, pid, eid)
823                    self.log.debug("[start_segment]: state is %s" % state)
824                    return state == 'active'
825            # Everything has gone OK.
826            return True
827
828    class stop_segment(emulab_segment):
829        def __init__(self, log=None, keyfile=None, debug=False):
830            experiment_control_local.emulab_segment.__init__(self,
831                    log=log, keyfile=keyfile, debug=debug)
832
833        def __call__(self, tb, eid, tbparams):
834            """
835            Stop a sub experiment by calling swapexp on the federant
836            """
837            user = tbparams[tb]['user']
838            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
839            pid = tbparams[tb]['project']
840
841            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
842            rv = False
843            try:
844                # Clean out tar files: we've gone over quota in the past
845                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
846                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
847                        (pid, eid))
848                rv = self.ssh_cmd(user, host,
849                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
850            except self.ssh_cmd_timeout:
851                rv = False
852            return rv
853
854       
855    def generate_ssh_keys(self, dest, type="rsa" ):
856        """
857        Generate a set of keys for the gateways to use to talk.
858
859        Keys are of type type and are stored in the required dest file.
860        """
861        valid_types = ("rsa", "dsa")
862        t = type.lower();
863        if t not in valid_types: raise ValueError
864        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
865
866        try:
867            trace = open("/dev/null", "w")
868        except IOError:
869            raise service_error(service_error.internal,
870                    "Cannot open /dev/null??");
871
872        # May raise CalledProcessError
873        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
874        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
875        if rv != 0:
876            raise service_error(service_error.internal, 
877                    "Cannot generate nonce ssh keys.  %s return code %d" \
878                            % (self.ssh_keygen, rv))
879
880    def gentopo(self, str):
881        """
882        Generate the topology dtat structure from the splitter's XML
883        representation of it.
884
885        The topology XML looks like:
886            <experiment>
887                <nodes>
888                    <node><vname></vname><ips>ip1:ip2</ips></node>
889                </nodes>
890                <lans>
891                    <lan>
892                        <vname></vname><vnode></vnode><ip></ip>
893                        <bandwidth></bandwidth><member>node:port</member>
894                    </lan>
895                </lans>
896        """
897        class topo_parse:
898            """
899            Parse the topology XML and create the dats structure.
900            """
901            def __init__(self):
902                # Typing of the subelements for data conversion
903                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
904                self.int_subelements = ( 'bandwidth',)
905                self.float_subelements = ( 'delay',)
906                # The final data structure
907                self.nodes = [ ]
908                self.lans =  [ ]
909                self.topo = { \
910                        'node': self.nodes,\
911                        'lan' : self.lans,\
912                    }
913                self.element = { }  # Current element being created
914                self.chars = ""     # Last text seen
915
916            def end_element(self, name):
917                # After each sub element the contents is added to the current
918                # element or to the appropriate list.
919                if name == 'node':
920                    self.nodes.append(self.element)
921                    self.element = { }
922                elif name == 'lan':
923                    self.lans.append(self.element)
924                    self.element = { }
925                elif name in self.str_subelements:
926                    self.element[name] = self.chars
927                    self.chars = ""
928                elif name in self.int_subelements:
929                    self.element[name] = int(self.chars)
930                    self.chars = ""
931                elif name in self.float_subelements:
932                    self.element[name] = float(self.chars)
933                    self.chars = ""
934
935            def found_chars(self, data):
936                self.chars += data.rstrip()
937
938
939        tp = topo_parse();
940        parser = xml.parsers.expat.ParserCreate()
941        parser.EndElementHandler = tp.end_element
942        parser.CharacterDataHandler = tp.found_chars
943
944        parser.Parse(str)
945
946        return tp.topo
947       
948
949    def genviz(self, topo):
950        """
951        Generate the visualization the virtual topology
952        """
953
954        neato = "/usr/local/bin/neato"
955        # These are used to parse neato output and to create the visualization
956        # file.
957        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
958        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
959                "%s</type></node>"
960
961        try:
962            # Node names
963            nodes = [ n['vname'] for n in topo['node'] ]
964            topo_lans = topo['lan']
965        except KeyError:
966            raise service_error(service_error.internal, "Bad topology")
967
968        lans = { }
969        links = { }
970
971        # Walk through the virtual topology, organizing the connections into
972        # 2-node connections (links) and more-than-2-node connections (lans).
973        # When a lan is created, it's added to the list of nodes (there's a
974        # node in the visualization for the lan).
975        for l in topo_lans:
976            if links.has_key(l['vname']):
977                if len(links[l['vname']]) < 2:
978                    links[l['vname']].append(l['vnode'])
979                else:
980                    nodes.append(l['vname'])
981                    lans[l['vname']] = links[l['vname']]
982                    del links[l['vname']]
983                    lans[l['vname']].append(l['vnode'])
984            elif lans.has_key(l['vname']):
985                lans[l['vname']].append(l['vnode'])
986            else:
987                links[l['vname']] = [ l['vnode'] ]
988
989
990        # Open up a temporary file for dot to turn into a visualization
991        try:
992            df, dotname = tempfile.mkstemp()
993            dotfile = os.fdopen(df, 'w')
994        except IOError:
995            raise service_error(service_error.internal,
996                    "Failed to open file in genviz")
997
998        try:
999            dnull = open('/dev/null', 'w')
1000        except IOError:
1001            service_error(service_error.internal,
1002                    "Failed to open /dev/null in genviz")
1003
1004        # Generate a dot/neato input file from the links, nodes and lans
1005        try:
1006            print >>dotfile, "graph G {"
1007            for n in nodes:
1008                print >>dotfile, '\t"%s"' % n
1009            for l in links.keys():
1010                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
1011            for l in lans.keys():
1012                for n in lans[l]:
1013                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
1014            print >>dotfile, "}"
1015            dotfile.close()
1016        except TypeError:
1017            raise service_error(service_error.internal,
1018                    "Single endpoint link in vtopo")
1019        except IOError:
1020            raise service_error(service_error.internal, "Cannot write dot file")
1021
1022        # Use dot to create a visualization
1023        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1024                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
1025                close_fds=True)
1026        dnull.close()
1027
1028        # Translate dot to vis format
1029        vis_nodes = [ ]
1030        vis = { 'node': vis_nodes }
1031        for line in dot.stdout:
1032            m = vis_re.match(line)
1033            if m:
1034                vn = m.group(1)
1035                vis_node = {'name': vn, \
1036                        'x': float(m.group(2)),\
1037                        'y' : float(m.group(3)),\
1038                    }
1039                if vn in links.keys() or vn in lans.keys():
1040                    vis_node['type'] = 'lan'
1041                else:
1042                    vis_node['type'] = 'node'
1043                vis_nodes.append(vis_node)
1044        rv = dot.wait()
1045
1046        os.remove(dotname)
1047        if rv == 0 : return vis
1048        else: return None
1049
1050    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1051            access_user):
1052        """
1053        Get access to testbed through fedd and set the parameters for that tb
1054        """
1055        uri = self.tbmap.get(tb, None)
1056        if not uri:
1057            raise service_error(serice_error.server_config, 
1058                    "Unknown testbed: %s" % tb)
1059
1060        # currently this lumps all users into one service access group
1061        service_keys = [ a for u in user \
1062                for a in u.get('access', []) \
1063                    if a.has_key('sshPubkey')]
1064
1065        if len(service_keys) == 0:
1066            raise service_error(service_error.req, 
1067                    "Must have at least one SSH pubkey for services")
1068
1069        # Tweak search order so that if there are entries in access_user that
1070        # have a project matching the export project, we try them first
1071        if export_project and export_project.has_key('localname'):
1072            pn = export_project['localname']
1073
1074            access_sequence = [ (p, u) for p, u in access_user if p == pn]
1075            access_sequence.extend([(p, u) for p, u in access_user if p != pn])
1076        else:
1077            access_sequence = access_user
1078
1079        for p, u in access_sequence:
1080            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1081                    "to %s") %  ((p or "None"), u, uri))
1082
1083            if p:
1084                # Request with user and project specified
1085                req = {\
1086                        'destinationTestbed' : { 'uri' : uri },
1087                        'project': { 
1088                            'name': {'localname': p},
1089                            'user': [ {'userID': { 'localname': u } } ],
1090                            },
1091                        'user':  user,
1092                        'allocID' : { 'localname': 'test' },
1093                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1094                        'serviceAccess' : service_keys
1095                    }
1096            else:
1097                # Request with only user specified
1098                req = {\
1099                        'destinationTestbed' : { 'uri' : uri },
1100                        'user':  [ {'userID': { 'localname': u } } ],
1101                        'allocID' : { 'localname': 'test' },
1102                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1103                        'serviceAccess' : service_keys
1104                    }
1105
1106            if tb == master:
1107                # NB, the export_project parameter is a dict that includes
1108                # the type
1109                req['exportProject'] = export_project
1110
1111            # node resources if any
1112            if nodes != None and len(nodes) > 0:
1113                rnodes = [ ]
1114                for n in nodes:
1115                    rn = { }
1116                    image, hw, count = n.split(":")
1117                    if image: rn['image'] = [ image ]
1118                    if hw: rn['hardware'] = [ hw ]
1119                    if count and int(count) >0 : rn['count'] = int(count)
1120                    rnodes.append(rn)
1121                req['resources']= { }
1122                req['resources']['node'] = rnodes
1123
1124            try:
1125                if self.local_access.has_key(uri):
1126                    # Local access call
1127                    req = { 'RequestAccessRequestBody' : req }
1128                    r = self.local_access[uri].RequestAccess(req, 
1129                            fedid(file=self.cert_file))
1130                    r = { 'RequestAccessResponseBody' : r }
1131                else:
1132                    r = self.call_RequestAccess(uri, req, 
1133                            self.cert_file, self.cert_pwd, self.trusted_certs)
1134            except service_error, e:
1135                if e.code == service_error.access:
1136                    self.log.debug("[get_access] Access denied")
1137                    r = None
1138                    continue
1139                else:
1140                    raise e
1141
1142            if r.has_key('RequestAccessResponseBody'):
1143                # Through to here we have a valid response, not a fault.
1144                # Access denied is a fault, so something better or worse than
1145                # access denied has happened.
1146                r = r['RequestAccessResponseBody']
1147                self.log.debug("[get_access] Access granted")
1148                break
1149            else:
1150                raise service_error(service_error.protocol,
1151                        "Bad proxy response")
1152       
1153        if not r:
1154            raise service_error(service_error.access, 
1155                    "Access denied by %s (%s)" % (tb, uri))
1156
1157        e = r['emulab']
1158        p = e['project']
1159        tbparam[tb] = { 
1160                "boss": e['boss'],
1161                "host": e['ops'],
1162                "domain": e['domain'],
1163                "fs": e['fileServer'],
1164                "eventserver": e['eventServer'],
1165                "project": unpack_id(p['name']),
1166                "emulab" : e,
1167                "allocID" : r['allocID'],
1168                }
1169        # Make the testbed name be the label the user applied
1170        p['testbed'] = {'localname': tb }
1171
1172        for u in p['user']:
1173            role = u.get('role', None)
1174            if role == 'experimentCreation':
1175                tbparam[tb]['user'] = unpack_id(u['userID'])
1176                break
1177        else:
1178            raise service_error(service_error.internal, 
1179                    "No createExperimentUser from %s" %tb)
1180
1181        # Add attributes to barameter space.  We don't allow attributes to
1182        # overlay any parameters already installed.
1183        for a in e['fedAttr']:
1184            try:
1185                if a['attribute'] and isinstance(a['attribute'], basestring)\
1186                        and not tbparam[tb].has_key(a['attribute'].lower()):
1187                    tbparam[tb][a['attribute'].lower()] = a['value']
1188            except KeyError:
1189                self.log.error("Bad attribute in response: %s" % a)
1190       
1191    def release_access(self, tb, aid):
1192        """
1193        Release access to testbed through fedd
1194        """
1195
1196        uri = self.tbmap.get(tb, None)
1197        if not uri:
1198            raise service_error(serice_error.server_config, 
1199                    "Unknown testbed: %s" % tb)
1200
1201        if self.local_access.has_key(uri):
1202            resp = self.local_access[uri].ReleaseAccess(\
1203                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1204                    fedid(file=self.cert_file))
1205            resp = { 'ReleaseAccessResponseBody': resp } 
1206        else:
1207            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1208                    self.cert_file, self.cert_pwd, self.trusted_certs)
1209
1210        # better error coding
1211
1212    def remote_splitter(self, uri, desc, master):
1213
1214        req = {
1215                'description' : { 'ns2description': desc },
1216                'master': master,
1217                'include_fedkit': bool(self.fedkit),
1218                'include_gatewaykit': bool(self.gatewaykit)
1219            }
1220
1221        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1222                self.trusted_certs)
1223
1224        if r.has_key('Ns2SplitResponseBody'):
1225            r = r['Ns2SplitResponseBody']
1226            if r.has_key('output'):
1227                return r['output'].splitlines()
1228            else:
1229                raise service_error(service_error.protocol, 
1230                        "Bad splitter response (no output)")
1231        else:
1232            raise service_error(service_error.protocol, "Bad splitter response")
1233       
1234    class current_testbed:
1235        """
1236        Object for collecting the current testbed description.  The testbed
1237        description is saved to a file with the local testbed variables
1238        subsittuted line by line.
1239        """
1240        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1241            def tar_list_to_string(tl):
1242                if tl is None: return None
1243
1244                rv = ""
1245                for t in tl:
1246                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1247                            (t[0], os.path.basename(t[1]))
1248                return rv
1249
1250
1251            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1252            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1253            self.current_testbed = None
1254            self.testbed_file = None
1255
1256            self.def_expstart = \
1257                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1258            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1259            self.def_gwstart = \
1260                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1261            self.def_mgwstart = \
1262                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1263            self.def_gwimage = "FBSD61-TUNNEL2";
1264            self.def_gwtype = "pc";
1265            self.def_mgwcmd = '# '
1266            self.def_mgwcmdparams = ''
1267            self.def_gwcmd = '# '
1268            self.def_gwcmdparams = ''
1269
1270            self.eid = eid
1271            self.tmpdir = tmpdir
1272            # Convert fedkit and gateway kit (which are lists of tuples) into a
1273            # substituition string.
1274            self.fedkit = tar_list_to_string(fedkit)
1275            self.gatewaykit = tar_list_to_string(gatewaykit)
1276
1277        def __call__(self, line, master, allocated, tbparams):
1278            # Capture testbed topology descriptions
1279            if self.current_testbed == None:
1280                m = self.begin_testbed.match(line)
1281                if m != None:
1282                    self.current_testbed = m.group(1)
1283                    if self.current_testbed == None:
1284                        raise service_error(service_error.req,
1285                                "Bad request format (unnamed testbed)")
1286                    allocated[self.current_testbed] = \
1287                            allocated.get(self.current_testbed,0) + 1
1288                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1289                    if not os.path.exists(tb_dir):
1290                        try:
1291                            os.mkdir(tb_dir)
1292                        except IOError:
1293                            raise service_error(service_error.internal,
1294                                    "Cannot create %s" % tb_dir)
1295                    try:
1296                        self.testbed_file = open("%s/%s.%s.tcl" %
1297                                (tb_dir, self.eid, self.current_testbed), 'w')
1298                    except IOError:
1299                        self.testbed_file = None
1300                    return True
1301                else: return False
1302            else:
1303                m = self.end_testbed.match(line)
1304                if m != None:
1305                    if m.group(1) != self.current_testbed:
1306                        raise service_error(service_error.internal, 
1307                                "Mismatched testbed markers!?")
1308                    if self.testbed_file != None: 
1309                        self.testbed_file.close()
1310                        self.testbed_file = None
1311                    self.current_testbed = None
1312                elif self.testbed_file:
1313                    # Substitute variables and put the line into the local
1314                    # testbed file.
1315                    gwtype = tbparams[self.current_testbed].get(\
1316                            'connectortype', self.def_gwtype)
1317                    gwimage = tbparams[self.current_testbed].get(\
1318                            'connectorimage', self.def_gwimage)
1319                    mgwstart = tbparams[self.current_testbed].get(\
1320                            'masterconnectorstartcmd', self.def_mgwstart)
1321                    mexpstart = tbparams[self.current_testbed].get(\
1322                            'masternodestartcmd', self.def_mexpstart)
1323                    gwstart = tbparams[self.current_testbed].get(\
1324                            'slaveconnectorstartcmd', self.def_gwstart)
1325                    expstart = tbparams[self.current_testbed].get(\
1326                            'slavenodestartcmd', self.def_expstart)
1327                    project = tbparams[self.current_testbed].get('project')
1328                    gwcmd = tbparams[self.current_testbed].get(\
1329                            'slaveconnectorcmd', self.def_gwcmd)
1330                    gwcmdparams = tbparams[self.current_testbed].get(\
1331                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1332                    mgwcmd = tbparams[self.current_testbed].get(\
1333                            'masterconnectorcmd', self.def_gwcmd)
1334                    mgwcmdparams = tbparams[self.current_testbed].get(\
1335                            'masterconnectorcmdparams', self.def_gwcmdparams)
1336                    line = re.sub("GWTYPE", gwtype, line)
1337                    line = re.sub("GWIMAGE", gwimage, line)
1338                    if self.current_testbed == master:
1339                        line = re.sub("GWSTART", mgwstart, line)
1340                        line = re.sub("EXPSTART", mexpstart, line)
1341                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1342                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1343                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1344                    else:
1345                        line = re.sub("GWSTART", gwstart, line)
1346                        line = re.sub("EXPSTART", expstart, line)
1347                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1348                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1349                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1350                    #These expansions contain EID and PROJDIR.  NB these are
1351                    # local fedkit and gatewaykit, which are strings.
1352                    if self.fedkit:
1353                        line = re.sub("FEDKIT", self.fedkit, line)
1354                    if self.gatewaykit:
1355                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1356                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1357                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1358                    line = re.sub("EID", self.eid, line)
1359                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1360                            (project, self.eid), line)
1361                    print >>self.testbed_file, line
1362                return True
1363
1364    class allbeds:
1365        """
1366        Process the Allbeds section.  Get access to each federant and save the
1367        parameters in tbparams
1368        """
1369        def __init__(self, get_access):
1370            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1371            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1372            self.in_allbeds = False
1373            self.get_access = get_access
1374
1375        def __call__(self, line, user, tbparams, master, export_project,
1376                access_user):
1377            # Testbed access parameters
1378            if not self.in_allbeds:
1379                if self.begin_allbeds.match(line):
1380                    self.in_allbeds = True
1381                    return True
1382                else:
1383                    return False
1384            else:
1385                if self.end_allbeds.match(line):
1386                    self.in_allbeds = False
1387                else:
1388                    nodes = line.split('|')
1389                    tb = nodes.pop(0)
1390                    self.get_access(tb, nodes, user, tbparams, master,
1391                            export_project, access_user)
1392                return True
1393
1394    class gateways:
1395        def __init__(self, eid, master, tmpdir, gw_pubkey,
1396                gw_secretkey, copy_file, fedkit):
1397            self.begin_gateways = \
1398                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1399            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1400            self.current_gateways = None
1401            self.control_gateway = None
1402            self.active_end = { }
1403
1404            self.eid = eid
1405            self.master = master
1406            self.tmpdir = tmpdir
1407            self.gw_pubkey_base = gw_pubkey
1408            self.gw_secretkey_base = gw_secretkey
1409
1410            self.copy_file = copy_file
1411            self.fedkit = fedkit
1412
1413
1414        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1415                active_end, tbparams, dtb, myname, desthost, type):
1416            """
1417            Produce a gateway configuration file from a gateways line.
1418            """
1419
1420            sproject = tbparams[gw].get('project', 'project')
1421            dproject = tbparams[dtb].get('project', 'project')
1422            sdomain = ".%s.%s%s" % (eid, sproject,
1423                    tbparams[gw].get('domain', ".example.com"))
1424            ddomain = ".%s.%s%s" % (eid, dproject,
1425                    tbparams[dtb].get('domain', ".example.com"))
1426            boss = tbparams[master].get('boss', "boss")
1427            fs = tbparams[master].get('fs', "fs")
1428            event_server = "%s%s" % \
1429                    (tbparams[gw].get('eventserver', "event_server"),
1430                            tbparams[gw].get('domain', "example.com"))
1431            remote_event_server = "%s%s" % \
1432                    (tbparams[dtb].get('eventserver', "event_server"),
1433                            tbparams[dtb].get('domain', "example.com"))
1434            seer_control = "%s%s" % \
1435                    (tbparams[gw].get('control', "control"), sdomain)
1436            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1437
1438            if self.fedkit:
1439                remote_script_dir = "/usr/local/federation/bin"
1440                local_script_dir = "/usr/local/federation/bin"
1441            else:
1442                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1443                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1444
1445            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1446            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1447            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1448
1449            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1450            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1451
1452            # translate to lower case so the `hostname` hack for specifying
1453            # configuration files works.
1454            conf_file = conf_file.lower();
1455            remote_conf_file = remote_conf_file.lower();
1456
1457            if dtb == master:
1458                active = "false"
1459            elif gw == master:
1460                active = "true"
1461            elif active_end.has_key('%s-%s' % (dtb, gw)):
1462                active = "false"
1463            else:
1464                active_end['%s-%s' % (gw, dtb)] = 1
1465                active = "true"
1466
1467            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1468            print >>gwconfig, "Active: %s" % active
1469            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1470            if tunnel_iface:
1471                print >>gwconfig, "Interface: %s" % tunnel_iface
1472            print >>gwconfig, "BossName: %s" % boss
1473            print >>gwconfig, "FsName: %s" % fs
1474            print >>gwconfig, "EventServerName: %s" % event_server
1475            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1476            print >>gwconfig, "SeerControl: %s" % seer_control
1477            print >>gwconfig, "Type: %s" % type
1478            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1479            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1480                    local_script_dir
1481            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1482            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1483            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1484                    (remote_conf_dir, remote_conf_file)
1485            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1486            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1487            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1488            gwconfig.close()
1489
1490            return active == "true"
1491
1492        def __call__(self, line, allocated, tbparams):
1493            # Process gateways
1494            if not self.current_gateways:
1495                m = self.begin_gateways.match(line)
1496                if m:
1497                    self.current_gateways = m.group(1)
1498                    if allocated.has_key(self.current_gateways):
1499                        # This test should always succeed
1500                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1501                        if not os.path.exists(tb_dir):
1502                            try:
1503                                os.mkdir(tb_dir)
1504                            except IOError:
1505                                raise service_error(service_error.internal,
1506                                        "Cannot create %s" % tb_dir)
1507                    else:
1508                        # XXX
1509                        self.log.error("[gateways]: Ignoring gateways for " + \
1510                                "unknown testbed %s" % self.current_gateways)
1511                        self.current_gateways = None
1512                    return True
1513                else:
1514                    return False
1515            else:
1516                m = self.end_gateways.match(line)
1517                if m :
1518                    if m.group(1) != self.current_gateways:
1519                        raise service_error(service_error.internal,
1520                                "Mismatched gateway markers!?")
1521                    if self.control_gateway:
1522                        try:
1523                            cc = open("%s/%s/client.conf" %
1524                                    (self.tmpdir, self.current_gateways), 'w')
1525                            print >>cc, "ControlGateway: %s" % \
1526                                    self.control_gateway
1527                            if tbparams[self.master].has_key('smbshare'):
1528                                print >>cc, "SMBSHare: %s" % \
1529                                        tbparams[self.master]['smbshare']
1530                            print >>cc, "ProjectUser: %s" % \
1531                                    tbparams[self.master]['user']
1532                            print >>cc, "ProjectName: %s" % \
1533                                    tbparams[self.master]['project']
1534                            print >>cc, "ExperimentID: %s/%s" % \
1535                                    ( tbparams[self.master]['project'], \
1536                                    self.eid )
1537                            cc.close()
1538                        except IOError:
1539                            raise service_error(service_error.internal,
1540                                    "Error creating client config")
1541                        # XXX: This seer specific file should disappear
1542                        try:
1543                            cc = open("%s/%s/seer.conf" %
1544                                    (self.tmpdir, self.current_gateways),
1545                                    'w')
1546                            if self.current_gateways != self.master:
1547                                print >>cc, "ControlNode: %s" % \
1548                                        self.control_gateway
1549                            print >>cc, "ExperimentID: %s/%s" % \
1550                                    ( tbparams[self.master]['project'], \
1551                                    self.eid )
1552                            cc.close()
1553                        except IOError:
1554                            raise service_error(service_error.internal,
1555                                    "Error creating seer config")
1556                    else:
1557                        debug.error("[gateways]: No control gateway for %s" %\
1558                                    self.current_gateways)
1559                    self.current_gateways = None
1560                else:
1561                    dtb, myname, desthost, type = line.split(" ")
1562
1563                    if type == "control" or type == "both":
1564                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1565                                self.eid, 
1566                                tbparams[self.current_gateways]['project'],
1567                                tbparams[self.current_gateways]['domain'])
1568                    try:
1569                        active = self.gateway_conf_file(self.current_gateways,
1570                                self.master, self.eid, self.gw_pubkey_base,
1571                                self.gw_secretkey_base,
1572                                self.active_end, tbparams, dtb, myname,
1573                                desthost, type)
1574                    except IOError, e:
1575                        raise service_error(service_error.internal,
1576                                "Failed to write config file for %s" % \
1577                                        self.current_gateway)
1578           
1579                    gw_pubkey = "%s/keys/%s" % \
1580                            (self.tmpdir, self.gw_pubkey_base)
1581                    gw_secretkey = "%s/keys/%s" % \
1582                            (self.tmpdir, self.gw_secretkey_base)
1583
1584                    pkfile = "%s/%s/%s" % \
1585                            ( self.tmpdir, self.current_gateways, 
1586                                    self.gw_pubkey_base)
1587                    skfile = "%s/%s/%s" % \
1588                            ( self.tmpdir, self.current_gateways, 
1589                                    self.gw_secretkey_base)
1590
1591                    if not os.path.exists(pkfile):
1592                        try:
1593                            self.copy_file(gw_pubkey, pkfile)
1594                        except IOError:
1595                            service_error(service_error.internal,
1596                                    "Failed to copy pubkey file")
1597
1598                    if active and not os.path.exists(skfile):
1599                        try:
1600                            self.copy_file(gw_secretkey, skfile)
1601                        except IOError:
1602                            service_error(service_error.internal,
1603                                    "Failed to copy secretkey file")
1604                return True
1605
1606    class shunt_to_file:
1607        """
1608        Simple class to write data between two regexps to a file.
1609        """
1610        def __init__(self, begin, end, filename):
1611            """
1612            Begin shunting on a match of begin, stop on end, send data to
1613            filename.
1614            """
1615            self.begin = re.compile(begin)
1616            self.end = re.compile(end)
1617            self.in_shunt = False
1618            self.file = None
1619            self.filename = filename
1620
1621        def __call__(self, line):
1622            """
1623            Call this on each line in the input that may be shunted.
1624            """
1625            if not self.in_shunt:
1626                if self.begin.match(line):
1627                    self.in_shunt = True
1628                    try:
1629                        self.file = open(self.filename, "w")
1630                    except:
1631                        self.file = None
1632                        raise
1633                    return True
1634                else:
1635                    return False
1636            else:
1637                if self.end.match(line):
1638                    if self.file: 
1639                        self.file.close()
1640                        self.file = None
1641                    self.in_shunt = False
1642                else:
1643                    if self.file:
1644                        print >>self.file, line
1645                return True
1646
1647    class shunt_to_list:
1648        """
1649        Same interface as shunt_to_file.  Data collected in self.list, one list
1650        element per line.
1651        """
1652        def __init__(self, begin, end):
1653            self.begin = re.compile(begin)
1654            self.end = re.compile(end)
1655            self.in_shunt = False
1656            self.list = [ ]
1657       
1658        def __call__(self, line):
1659            if not self.in_shunt:
1660                if self.begin.match(line):
1661                    self.in_shunt = True
1662                    return True
1663                else:
1664                    return False
1665            else:
1666                if self.end.match(line):
1667                    self.in_shunt = False
1668                else:
1669                    self.list.append(line)
1670                return True
1671
1672    class shunt_to_string:
1673        """
1674        Same interface as shunt_to_file.  Data collected in self.str, all in
1675        one string.
1676        """
1677        def __init__(self, begin, end):
1678            self.begin = re.compile(begin)
1679            self.end = re.compile(end)
1680            self.in_shunt = False
1681            self.str = ""
1682       
1683        def __call__(self, line):
1684            if not self.in_shunt:
1685                if self.begin.match(line):
1686                    self.in_shunt = True
1687                    return True
1688                else:
1689                    return False
1690            else:
1691                if self.end.match(line):
1692                    self.in_shunt = False
1693                else:
1694                    self.str += line
1695                return True
1696
1697    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1698            tbparams, tmpdir, alloc_log=None):
1699        started = { }           # Testbeds where a sub-experiment started
1700                                # successfully
1701
1702        # XXX
1703        fail_soft = False
1704
1705        log = alloc_log or self.log
1706
1707        thread_pool = self.thread_pool(self.nthreads)
1708        threads = [ ]
1709
1710        for tb in [ k for k in allocated.keys() if k != master]:
1711            # Create and start a thread to start the segment, and save it to
1712            # get the return value later
1713            thread_pool.wait_for_slot()
1714            t  = self.pooled_thread(\
1715                    target=self.start_segment(log=log,
1716                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1717                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1718                    pdata=thread_pool, trace_file=self.trace_file)
1719            threads.append(t)
1720            t.start()
1721
1722        # Wait until all finish
1723        thread_pool.wait_for_all_done()
1724
1725        # If none failed, start the master
1726        failed = [ t.getName() for t in threads if not t.rv ]
1727
1728        if len(failed) == 0:
1729            starter = self.start_segment(log=log, 
1730                    keyfile=self.ssh_privkey_file, debug=self.debug)
1731            if not starter(master, eid, tbparams, tmpdir):
1732                failed.append(master)
1733
1734        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1735        # If one failed clean up, unless fail_soft is set
1736        if failed:
1737            if not fail_soft:
1738                thread_pool.clear()
1739                for tb in succeeded:
1740                    # Create and start a thread to stop the segment
1741                    thread_pool.wait_for_slot()
1742                    t  = self.pooled_thread(\
1743                            target=self.stop_segment(log=log,
1744                                keyfile=self.ssh_privkey_file,
1745                                debug=self.debug), 
1746                            args=(tb, eid, tbparams), name=tb,
1747                            pdata=thread_pool, trace_file=self.trace_file)
1748                    t.start()
1749                # Wait until all finish
1750                thread_pool.wait_for_all_done()
1751
1752                # release the allocations
1753                for tb in tbparams.keys():
1754                    self.release_access(tb, tbparams[tb]['allocID'])
1755                # Remove the placeholder
1756                self.state_lock.acquire()
1757                self.state[eid]['experimentStatus'] = 'failed'
1758                if self.state_filename: self.write_state()
1759                self.state_lock.release()
1760
1761                #raise service_error(service_error.federant,
1762                #    "Swap in failed on %s" % ",".join(failed))
1763                log.error("Swap in failed on %s" % ",".join(failed))
1764                return
1765        else:
1766            log.info("[start_segment]: Experiment %s active" % eid)
1767
1768        log.debug("[start_experiment]: removing %s" % tmpdir)
1769
1770        # Walk up tmpdir, deleting as we go
1771        for path, dirs, files in os.walk(tmpdir, topdown=False):
1772            for f in files:
1773                os.remove(os.path.join(path, f))
1774            for d in dirs:
1775                os.rmdir(os.path.join(path, d))
1776        os.rmdir(tmpdir)
1777
1778        # Insert the experiment into our state and update the disk copy
1779        self.state_lock.acquire()
1780        self.state[expid]['experimentStatus'] = 'active'
1781        self.state[eid] = self.state[expid]
1782        if self.state_filename: self.write_state()
1783        self.state_lock.release()
1784        return
1785
1786    def create_experiment(self, req, fid):
1787        """
1788        The external interface to experiment creation called from the
1789        dispatcher.
1790
1791        Creates a working directory, splits the incoming description using the
1792        splitter script and parses out the avrious subsections using the
1793        lcasses above.  Once each sub-experiment is created, use pooled threads
1794        to instantiate them and start it all up.
1795        """
1796
1797        if not self.auth.check_attribute(fid, 'create'):
1798            raise service_error(service_error.access, "Create access denied")
1799
1800        try:
1801            tmpdir = tempfile.mkdtemp(prefix="split-")
1802        except IOError:
1803            raise service_error(service_error.internal, "Cannot create tmp dir")
1804
1805        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1806        gw_secretkey_base = "fed.%s" % self.ssh_type
1807        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1808        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1809        tclfile = tmpdir + "/experiment.tcl"
1810        tbparams = { }
1811        try:
1812            access_user = self.accessdb[fid]
1813        except KeyError:
1814            raise service_error(service_error.internal,
1815                    "Access map and authorizer out of sync in " + \
1816                            "create_experiment for fedid %s"  % fid)
1817
1818        pid = "dummy"
1819        gid = "dummy"
1820        try:
1821            os.mkdir(tmpdir+"/keys")
1822        except OSError:
1823            raise service_error(service_error.internal,
1824                    "Can't make temporary dir")
1825
1826        req = req.get('CreateRequestBody', None)
1827        if not req:
1828            raise service_error(service_error.req,
1829                    "Bad request format (no CreateRequestBody)")
1830        # The tcl parser needs to read a file so put the content into that file
1831        descr=req.get('experimentdescription', None)
1832        if descr:
1833            file_content=descr.get('ns2description', None)
1834            if file_content:
1835                try:
1836                    f = open(tclfile, 'w')
1837                    f.write(file_content)
1838                    f.close()
1839                except IOError:
1840                    raise service_error(service_error.internal,
1841                            "Cannot write temp experiment description")
1842            else:
1843                raise service_error(service_error.req, 
1844                        "Only ns2descriptions supported")
1845        else:
1846            raise service_error(service_error.req, "No experiment description")
1847
1848        # Generate an ID for the experiment (slice) and a certificate that the
1849        # allocator can use to prove they own it.  We'll ship it back through
1850        # the encrypted connection.
1851        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1852
1853        if req.has_key('experimentID') and \
1854                req['experimentID'].has_key('localname'):
1855            overwrite = False
1856            eid = req['experimentID']['localname']
1857            # If there's an old failed experiment here with the same local name
1858            # and accessible by this user, we'll overwrite it, otherwise we'll
1859            # fall through and do the collision avoidance.
1860            old_expid = self.get_experiment_fedid(eid)
1861            if old_expid and self.check_experiment_access(fid, old_expid):
1862                self.state_lock.acquire()
1863                status = self.state[eid].get('experimentStatus', None)
1864                if status and status == 'failed':
1865                    # remove the old access attribute
1866                    self.auth.unset_attribute(fid, old_expid)
1867                    overwrite = True
1868                    del self.state[eid]
1869                    del self.state[old_expid]
1870                self.state_lock.release()
1871            self.state_lock.acquire()
1872            while (self.state.has_key(eid) and not overwrite):
1873                eid += random.choice(string.ascii_letters)
1874            # Initial state
1875            self.state[eid] = {
1876                    'experimentID' : \
1877                            [ { 'localname' : eid }, {'fedid': expid } ],
1878                    'experimentStatus': 'starting',
1879                    'experimentAccess': { 'X509' : expcert },
1880                    'owner': fid,
1881                    'log' : [],
1882                }
1883            self.state[expid] = self.state[eid]
1884            if self.state_filename: self.write_state()
1885            self.state_lock.release()
1886        else:
1887            eid = self.exp_stem
1888            for i in range(0,5):
1889                eid += random.choice(string.ascii_letters)
1890            self.state_lock.acquire()
1891            while (self.state.has_key(eid)):
1892                eid = self.exp_stem
1893                for i in range(0,5):
1894                    eid += random.choice(string.ascii_letters)
1895            # Initial state
1896            self.state[eid] = {
1897                    'experimentID' : \
1898                            [ { 'localname' : eid }, {'fedid': expid } ],
1899                    'experimentStatus': 'starting',
1900                    'experimentAccess': { 'X509' : expcert },
1901                    'owner': fid,
1902                    'log' : [],
1903                }
1904            self.state[expid] = self.state[eid]
1905            if self.state_filename: self.write_state()
1906            self.state_lock.release()
1907
1908        try: 
1909            # This catches exceptions to clear the placeholder if necessary
1910            try:
1911                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1912            except ValueError:
1913                raise service_error(service_error.server_config, 
1914                        "Bad key type (%s)" % self.ssh_type)
1915
1916            user = req.get('user', None)
1917            if user == None:
1918                raise service_error(service_error.req, "No user")
1919
1920            master = req.get('master', None)
1921            if not master:
1922                raise service_error(service_error.req,
1923                        "No master testbed label")
1924            export_project = req.get('exportProject', None)
1925            if not export_project:
1926                raise service_error(service_error.req, "No export project")
1927           
1928            if self.splitter_url:
1929                self.log.debug("Calling remote splitter at %s" % \
1930                        self.splitter_url)
1931                split_data = self.remote_splitter(self.splitter_url,
1932                        file_content, master)
1933            else:
1934                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1935                    str(self.muxmax), '-m', master]
1936
1937                if self.fedkit:
1938                    tclcmd.append('-k')
1939
1940                if self.gatewaykit:
1941                    tclcmd.append('-K')
1942
1943                tclcmd.extend([pid, gid, eid, tclfile])
1944
1945                self.log.debug("running local splitter %s", " ".join(tclcmd))
1946                # This is just fantastic.  As a side effect the parser copies
1947                # tb_compat.tcl into the current directory, so that directory
1948                # must be writable by the fedd user.  Doing this in the
1949                # temporary subdir ensures this is the case.
1950                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1951                        cwd=tmpdir)
1952                split_data = tclparser.stdout
1953
1954            allocated = { }         # Testbeds we can access
1955            # Objects to parse the splitter output (defined above)
1956            parse_current_testbed = self.current_testbed(eid, tmpdir,
1957                    self.fedkit, self.gatewaykit)
1958            parse_allbeds = self.allbeds(self.get_access)
1959            parse_gateways = self.gateways(eid, master, tmpdir,
1960                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1961                    self.fedkit)
1962            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1963                        "^#\s+End\s+Vtopo")
1964            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1965                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1966            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1967                    "^#\s+End\s+tarfiles")
1968            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1969                    "^#\s+End\s+rpms")
1970
1971            # Working on the split data
1972            for line in split_data:
1973                line = line.rstrip()
1974                if parse_current_testbed(line, master, allocated, tbparams):
1975                    continue
1976                elif parse_allbeds(line, user, tbparams, master, export_project,
1977                        access_user):
1978                    continue
1979                elif parse_gateways(line, allocated, tbparams):
1980                    continue
1981                elif parse_vtopo(line):
1982                    continue
1983                elif parse_hostnames(line):
1984                    continue
1985                elif parse_tarfiles(line):
1986                    continue
1987                elif parse_rpms(line):
1988                    continue
1989                else:
1990                    raise service_error(service_error.internal, 
1991                            "Bad tcl parse? %s" % line)
1992            # Virtual topology and visualization
1993            vtopo = self.gentopo(parse_vtopo.str)
1994            if not vtopo:
1995                raise service_error(service_error.internal, 
1996                        "Failed to generate virtual topology")
1997
1998            vis = self.genviz(vtopo)
1999            if not vis:
2000                raise service_error(service_error.internal, 
2001                        "Failed to generate visualization")
2002
2003           
2004            # save federant information
2005            for k in allocated.keys():
2006                tbparams[k]['federant'] = {\
2007                        'name': [ { 'localname' : eid} ],\
2008                        'emulab': tbparams[k]['emulab'],\
2009                        'allocID' : tbparams[k]['allocID'],\
2010                        'master' : k == master,\
2011                    }
2012
2013            self.state_lock.acquire()
2014            self.state[eid]['vtopo'] = vtopo
2015            self.state[eid]['vis'] = vis
2016            self.state[expid]['federant'] = \
2017                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2018                        if tbparams[tb].has_key('federant') ]
2019            if self.state_filename: self.write_state()
2020            self.state_lock.release()
2021
2022            # Copy tarfiles and rpms needed at remote sites into a staging area
2023            try:
2024                if self.fedkit:
2025                    for t in self.fedkit:
2026                        parse_tarfiles.list.append(t[1])
2027                if self.gatewaykit:
2028                    for t in self.gatewaykit:
2029                        parse_tarfiles.list.append(t[1])
2030                for t in parse_tarfiles.list:
2031                    if not os.path.exists("%s/tarfiles" % tmpdir):
2032                        os.mkdir("%s/tarfiles" % tmpdir)
2033                    self.copy_file(t, "%s/tarfiles/%s" % \
2034                            (tmpdir, os.path.basename(t)))
2035                for r in parse_rpms.list:
2036                    if not os.path.exists("%s/rpms" % tmpdir):
2037                        os.mkdir("%s/rpms" % tmpdir)
2038                    self.copy_file(r, "%s/rpms/%s" % \
2039                            (tmpdir, os.path.basename(r)))
2040                # A null experiment file in case we need to create a remote
2041                # experiment from scratch
2042                f = open("%s/null.tcl" % tmpdir, "w")
2043                print >>f, """
2044set ns [new Simulator]
2045source tb_compat.tcl
2046
2047set a [$ns node]
2048
2049$ns rtproto Session
2050$ns run
2051"""
2052                f.close()
2053
2054            except IOError, e:
2055                raise service_error(service_error.internal, 
2056                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2057
2058        except service_error, e:
2059            # If something goes wrong in the parse (usually an access error)
2060            # clear the placeholder state.  From here on out the code delays
2061            # exceptions.  Failing at this point returns a fault to the remote
2062            # caller.
2063            self.state_lock.acquire()
2064            del self.state[eid]
2065            del self.state[expid]
2066            if self.state_filename: self.write_state()
2067            self.state_lock.release()
2068            raise e
2069
2070
2071        # Start the background swapper and return the starting state.  From
2072        # here on out, the state will stick around a while.
2073
2074        # Let users touch the state
2075        self.auth.set_attribute(fid, expid)
2076        self.auth.set_attribute(expid, expid)
2077        # Override fedids can manipulate state as well
2078        for o in self.overrides:
2079            self.auth.set_attribute(o, expid)
2080
2081        # Create a logger that logs to the experiment's state object as well as
2082        # to the main log file.
2083        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2084        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2085        # XXX: there should be a global one of these rather than repeating the
2086        # code.
2087        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2088                    '%d %b %y %H:%M:%S'))
2089        alloc_log.addHandler(h)
2090       
2091        # Start a thread to do the resource allocation
2092        t  = Thread(target=self.allocate_resources,
2093                args=(allocated, master, eid, expid, expcert, tbparams, 
2094                    tmpdir, alloc_log),
2095                name=eid)
2096        t.start()
2097
2098        rv = {
2099                'experimentID': [
2100                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2101                ],
2102                'experimentStatus': 'starting',
2103                'experimentAccess': { 'X509' : expcert }
2104            }
2105
2106        return rv
2107   
2108    def get_experiment_fedid(self, key):
2109        """
2110        find the fedid associated with the localname key in the state database.
2111        """
2112
2113        rv = None
2114        self.state_lock.acquire()
2115        if self.state.has_key(key):
2116            if isinstance(self.state[key], dict):
2117                try:
2118                    kl = [ f['fedid'] for f in \
2119                            self.state[key]['experimentID']\
2120                                if f.has_key('fedid') ]
2121                except KeyError:
2122                    self.state_lock.release()
2123                    raise service_error(service_error.internal, 
2124                            "No fedid for experiment %s when getting "+\
2125                                    "fedid(!?)" % key)
2126                if len(kl) == 1:
2127                    rv = kl[0]
2128                else:
2129                    self.state_lock.release()
2130                    raise service_error(service_error.internal, 
2131                            "multiple fedids for experiment %s when " +\
2132                                    "getting fedid(!?)" % key)
2133            else:
2134                self.state_lock.release()
2135                raise service_error(service_error.internal, 
2136                        "Unexpected state for %s" % key)
2137        self.state_lock.release()
2138        return rv
2139
2140    def check_experiment_access(self, fid, key):
2141        """
2142        Confirm that the fid has access to the experiment.  Though a request
2143        may be made in terms of a local name, the access attribute is always
2144        the experiment's fedid.
2145        """
2146        if not isinstance(key, fedid):
2147            key = self.get_experiment_fedid(key)
2148
2149        if self.auth.check_attribute(fid, key):
2150            return True
2151        else:
2152            raise service_error(service_error.access, "Access Denied")
2153
2154
2155
2156    def get_vtopo(self, req, fid):
2157        """
2158        Return the stored virtual topology for this experiment
2159        """
2160        rv = None
2161        state = None
2162
2163        req = req.get('VtopoRequestBody', None)
2164        if not req:
2165            raise service_error(service_error.req,
2166                    "Bad request format (no VtopoRequestBody)")
2167        exp = req.get('experiment', None)
2168        if exp:
2169            if exp.has_key('fedid'):
2170                key = exp['fedid']
2171                keytype = "fedid"
2172            elif exp.has_key('localname'):
2173                key = exp['localname']
2174                keytype = "localname"
2175            else:
2176                raise service_error(service_error.req, "Unknown lookup type")
2177        else:
2178            raise service_error(service_error.req, "No request?")
2179
2180        self.check_experiment_access(fid, key)
2181
2182        self.state_lock.acquire()
2183        if self.state.has_key(key):
2184            if self.state[key].has_key('vtopo'):
2185                rv = { 'experiment' : {keytype: key },\
2186                        'vtopo': self.state[key]['vtopo'],\
2187                    }
2188            else:
2189                state = self.state[key]['experimentStatus']
2190        self.state_lock.release()
2191
2192        if rv: return rv
2193        else: 
2194            if state:
2195                raise service_error(service_error.partial, 
2196                        "Not ready: %s" % state)
2197            else:
2198                raise service_error(service_error.req, "No such experiment")
2199
2200    def get_vis(self, req, fid):
2201        """
2202        Return the stored visualization for this experiment
2203        """
2204        rv = None
2205        state = None
2206
2207        req = req.get('VisRequestBody', None)
2208        if not req:
2209            raise service_error(service_error.req,
2210                    "Bad request format (no VisRequestBody)")
2211        exp = req.get('experiment', None)
2212        if exp:
2213            if exp.has_key('fedid'):
2214                key = exp['fedid']
2215                keytype = "fedid"
2216            elif exp.has_key('localname'):
2217                key = exp['localname']
2218                keytype = "localname"
2219            else:
2220                raise service_error(service_error.req, "Unknown lookup type")
2221        else:
2222            raise service_error(service_error.req, "No request?")
2223
2224        self.check_experiment_access(fid, key)
2225
2226        self.state_lock.acquire()
2227        if self.state.has_key(key):
2228            if self.state[key].has_key('vis'):
2229                rv =  { 'experiment' : {keytype: key },\
2230                        'vis': self.state[key]['vis'],\
2231                        }
2232            else:
2233                state = self.state[key]['experimentStatus']
2234        self.state_lock.release()
2235
2236        if rv: return rv
2237        else:
2238            if state:
2239                raise service_error(service_error.partial, 
2240                        "Not ready: %s" % state)
2241            else:
2242                raise service_error(service_error.req, "No such experiment")
2243
2244    def clean_info_response(self, rv):
2245        """
2246        Remove the information in the experiment's state object that is not in
2247        the info response.
2248        """
2249        # Remove the owner info (should always be there, but...)
2250        if rv.has_key('owner'): del rv['owner']
2251
2252        # Convert the log into the allocationLog parameter and remove the
2253        # log entry (with defensive programming)
2254        if rv.has_key('log'):
2255            rv['allocationLog'] = "".join(rv['log'])
2256            del rv['log']
2257        else:
2258            rv['allocationLog'] = ""
2259
2260        if rv['experimentStatus'] != 'active':
2261            if rv.has_key('federant'): del rv['federant']
2262        else:
2263            # remove the allocationID info from each federant
2264            for f in rv.get('federant', []):
2265                if f.has_key('allocID'): del f['allocID']
2266        return rv
2267
2268    def get_info(self, req, fid):
2269        """
2270        Return all the stored info about this experiment
2271        """
2272        rv = None
2273
2274        req = req.get('InfoRequestBody', None)
2275        if not req:
2276            raise service_error(service_error.req,
2277                    "Bad request format (no InfoRequestBody)")
2278        exp = req.get('experiment', None)
2279        if exp:
2280            if exp.has_key('fedid'):
2281                key = exp['fedid']
2282                keytype = "fedid"
2283            elif exp.has_key('localname'):
2284                key = exp['localname']
2285                keytype = "localname"
2286            else:
2287                raise service_error(service_error.req, "Unknown lookup type")
2288        else:
2289            raise service_error(service_error.req, "No request?")
2290
2291        self.check_experiment_access(fid, key)
2292
2293        # The state may be massaged by the service function that called
2294        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2295        # state.
2296        self.state_lock.acquire()
2297        if self.state.has_key(key):
2298            rv = copy.deepcopy(self.state[key])
2299        self.state_lock.release()
2300
2301        if rv:
2302            return self.clean_info_response(rv)
2303        else:
2304            raise service_error(service_error.req, "No such experiment")
2305
2306    def get_multi_info(self, req, fid):
2307        """
2308        Return all the stored info that this fedid can access
2309        """
2310        rv = { 'info': [ ] }
2311
2312        self.state_lock.acquire()
2313        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2314            try:
2315                self.check_experiment_access(fid, key)
2316            except service_error, e:
2317                if e.code == service_error.access:
2318                    continue
2319                else:
2320                    self.state_lock.release()
2321                    raise e
2322
2323            if self.state.has_key(key):
2324                e = copy.deepcopy(self.state[key])
2325                e = self.clean_info_response(e)
2326                rv['info'].append(e)
2327        self.state_lock.release()
2328        return rv
2329
2330
2331    def terminate_experiment(self, req, fid):
2332        """
2333        Swap this experiment out on the federants and delete the shared
2334        information
2335        """
2336        tbparams = { }
2337        req = req.get('TerminateRequestBody', None)
2338        if not req:
2339            raise service_error(service_error.req,
2340                    "Bad request format (no TerminateRequestBody)")
2341        force = req.get('force', False)
2342        exp = req.get('experiment', None)
2343        if exp:
2344            if exp.has_key('fedid'):
2345                key = exp['fedid']
2346                keytype = "fedid"
2347            elif exp.has_key('localname'):
2348                key = exp['localname']
2349                keytype = "localname"
2350            else:
2351                raise service_error(service_error.req, "Unknown lookup type")
2352        else:
2353            raise service_error(service_error.req, "No request?")
2354
2355        self.check_experiment_access(fid, key)
2356
2357        dealloc_list = [ ]
2358
2359
2360        # Create a logger that logs to the dealloc_list as well as to the main
2361        # log file.
2362        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2363        h = logging.StreamHandler(self.list_log(dealloc_list))
2364        # XXX: there should be a global one of these rather than repeating the
2365        # code.
2366        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2367                    '%d %b %y %H:%M:%S'))
2368        dealloc_log.addHandler(h)
2369
2370        self.state_lock.acquire()
2371        fed_exp = self.state.get(key, None)
2372
2373        if fed_exp:
2374            # This branch of the conditional holds the lock to generate a
2375            # consistent temporary tbparams variable to deallocate experiments.
2376            # It releases the lock to do the deallocations and reacquires it to
2377            # remove the experiment state when the termination is complete.
2378
2379            # First make sure that the experiment creation is complete.
2380            status = fed_exp.get('experimentStatus', None)
2381
2382            if status:
2383                if status in ('starting', 'terminating'):
2384                    if not force:
2385                        self.state_lock.release()
2386                        raise service_error(service_error.partial, 
2387                                'Experiment still being created or destroyed')
2388                    else:
2389                        self.log.warning('Experiment in %s state ' % status + \
2390                                'being terminated by force.')
2391            else:
2392                # No status??? trouble
2393                self.state_lock.release()
2394                raise service_error(service_error.internal,
2395                        "Experiment has no status!?")
2396
2397            ids = []
2398            #  experimentID is a list of dicts that are self-describing
2399            #  identifiers.  This finds all the fedids and localnames - the
2400            #  keys of self.state - and puts them into ids.
2401            for id in fed_exp.get('experimentID', []):
2402                if id.has_key('fedid'): ids.append(id['fedid'])
2403                if id.has_key('localname'): ids.append(id['localname'])
2404
2405            # Construct enough of the tbparams to make the stop_segment calls
2406            # work
2407            for fed in fed_exp.get('federant', []):
2408                try:
2409                    for e in fed['name']:
2410                        eid = e.get('localname', None)
2411                        if eid: break
2412                    else:
2413                        continue
2414
2415                    p = fed['emulab']['project']
2416
2417                    project = p['name']['localname']
2418                    tb = p['testbed']['localname']
2419                    user = p['user'][0]['userID']['localname']
2420
2421                    domain = fed['emulab']['domain']
2422                    host  = fed['emulab']['ops']
2423                    aid = fed['allocID']
2424                except KeyError, e:
2425                    continue
2426                tbparams[tb] = {\
2427                        'user': user,\
2428                        'domain': domain,\
2429                        'project': project,\
2430                        'host': host,\
2431                        'eid': eid,\
2432                        'aid': aid,\
2433                    }
2434            fed_exp['experimentStatus'] = 'terminating'
2435            if self.state_filename: self.write_state()
2436            self.state_lock.release()
2437
2438            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2439            # then completes, so we can't wait if nothing starts.  So, no
2440            # tbparams, no start.
2441            if len(tbparams) > 0:
2442                thread_pool = self.thread_pool(self.nthreads)
2443                for tb in tbparams.keys():
2444                    # Create and start a thread to stop the segment
2445                    thread_pool.wait_for_slot()
2446                    t  = self.pooled_thread(\
2447                            target=self.stop_segment(log=dealloc_log,
2448                                keyfile=self.ssh_privkey_file, debug=self.debug), 
2449                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2450                            pdata=thread_pool, trace_file=self.trace_file)
2451                    t.start()
2452                # Wait for completions
2453                thread_pool.wait_for_all_done()
2454
2455            # release the allocations (failed experiments have done this
2456            # already, and starting experiments may be in odd states, so we
2457            # ignore errors releasing those allocations
2458            try: 
2459                for tb in tbparams.keys():
2460                    self.release_access(tb, tbparams[tb]['aid'])
2461            except service_error, e:
2462                if status != 'failed' and not force:
2463                    raise e
2464
2465            # Remove the terminated experiment
2466            self.state_lock.acquire()
2467            for id in ids:
2468                if self.state.has_key(id): del self.state[id]
2469
2470            if self.state_filename: self.write_state()
2471            self.state_lock.release()
2472
2473            return { 
2474                    'experiment': exp , 
2475                    'deallocationLog': "".join(dealloc_list),
2476                    }
2477        else:
2478            # Don't forget to release the lock
2479            self.state_lock.release()
2480            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.