source: fedd/federation/experiment_control.py @ 0b4e272

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 0b4e272 was 886307f, checked in by Ted Faber <faber@…>, 15 years ago

better subprocess output containment

  • Property mode set to 100644
File size: 90.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = Lock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
70   
71    class thread_pool:
72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
148
149    class pooled_thread(Thread):
150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
188
189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
192
193    def __init__(self, config=None, auth=None):
194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
214        self.list_log = experiment_control_local.list_log
215
216        self.cert_file = config.get("experiment_control", "cert_file")
217        if self.cert_file:
218            self.cert_pwd = config.get("experiment_control", "cert_pwd")
219        else:
220            self.cert_file = config.get("globals", "cert_file")
221            self.cert_pwd = config.get("globals", "cert_pwd")
222
223        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
224                or config.get("globals", "trusted_certs")
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 2
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'Create': soap_handler('Create', self.create_experiment),
337                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
338                'Vis': soap_handler('Vis', self.get_vis),
339                'Info': soap_handler('Info', self.get_info),
340                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
341                'Terminate': soap_handler('Terminate', 
342                    self.terminate_experiment),
343        }
344
345        self.xmlrpc_services = {\
346                'Create': xmlrpc_handler('Create', self.create_experiment),
347                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
348                'Vis': xmlrpc_handler('Vis', self.get_vis),
349                'Info': xmlrpc_handler('Info', self.get_info),
350                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
351                'Terminate': xmlrpc_handler('Terminate',
352                    self.terminate_experiment),
353        }
354
355    def copy_file(self, src, dest, size=1024):
356        """
357        Exceedingly simple file copy.
358        """
359        s = open(src,'r')
360        d = open(dest, 'w')
361
362        buf = "x"
363        while buf != "":
364            buf = s.read(size)
365            d.write(buf)
366        s.close()
367        d.close()
368
369    # Call while holding self.state_lock
370    def write_state(self):
371        """
372        Write a new copy of experiment state after copying the existing state
373        to a backup.
374
375        State format is a simple pickling of the state dictionary.
376        """
377        if os.access(self.state_filename, os.W_OK):
378            self.copy_file(self.state_filename, \
379                    "%s.bak" % self.state_filename)
380        try:
381            f = open(self.state_filename, 'w')
382            pickle.dump(self.state, f)
383        except IOError, e:
384            self.log.error("Can't write file %s: %s" % \
385                    (self.state_filename, e))
386        except pickle.PicklingError, e:
387            self.log.error("Pickling problem: %s" % e)
388        except TypeError, e:
389            self.log.error("Pickling problem (TypeError): %s" % e)
390
391    # Call while holding self.state_lock
392    def read_state(self):
393        """
394        Read a new copy of experiment state.  Old state is overwritten.
395
396        State format is a simple pickling of the state dictionary.
397        """
398        try:
399            f = open(self.state_filename, "r")
400            self.state = pickle.load(f)
401            self.log.debug("[read_state]: Read state from %s" % \
402                    self.state_filename)
403        except IOError, e:
404            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
405                    % (self.state_filename, e))
406        except pickle.UnpicklingError, e:
407            self.log.warning(("[read_state]: No saved state: " + \
408                    "Unpickling failed: %s") % e)
409       
410        for k in self.state.keys():
411            try:
412                # This list should only have one element in it, but phrasing it
413                # as a for loop doesn't cost much, really.  We have to find the
414                # fedid elements anyway.
415                for eid in [ f['fedid'] \
416                        for f in self.state[k]['experimentID']\
417                            if f.has_key('fedid') ]:
418                    self.auth.set_attribute(self.state[k]['owner'], eid)
419                    # allow overrides to control experiments as well
420                    for o in self.overrides:
421                        self.auth.set_attribute(o, eid)
422            except KeyError, e:
423                self.log.warning("[read_state]: State ownership or identity " +\
424                        "misformatted in %s: %s" % (self.state_filename, e))
425
426
427    def read_accessdb(self, accessdb_file):
428        """
429        Read the mapping from fedids that can create experiments to their name
430        in the 3-level access namespace.  All will be asserted from this
431        testbed and can include the local username and porject that will be
432        asserted on their behalf by this fedd.  Each fedid is also added to the
433        authorization system with the "create" attribute.
434        """
435        self.accessdb = {}
436        # These are the regexps for parsing the db
437        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
438        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
439                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
440        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
441                "\s*->\s*(" + name_expr + ")\s*$")
442        lineno = 0
443
444        # Parse the mappings and store in self.authdb, a dict of
445        # fedid -> (proj, user)
446        try:
447            f = open(accessdb_file, "r")
448            for line in f:
449                lineno += 1
450                line = line.strip()
451                if len(line) == 0 or line.startswith('#'):
452                    continue
453                m = project_line.match(line)
454                if m:
455                    fid = fedid(hexstr=m.group(1))
456                    project, user = m.group(2,3)
457                    if not self.accessdb.has_key(fid):
458                        self.accessdb[fid] = []
459                    self.accessdb[fid].append((project, user))
460                    continue
461
462                m = user_line.match(line)
463                if m:
464                    fid = fedid(hexstr=m.group(1))
465                    project = None
466                    user = m.group(2)
467                    if not self.accessdb.has_key(fid):
468                        self.accessdb[fid] = []
469                    self.accessdb[fid].append((project, user))
470                    continue
471                self.log.warn("[experiment_control] Error parsing access " +\
472                        "db %s at line %d" %  (accessdb_file, lineno))
473        except IOError:
474            raise service_error(service_error.internal,
475                    "Error opening/reading %s as experiment " +\
476                            "control accessdb" %  accessdb_file)
477        f.close()
478
479        # Initialize the authorization attributes
480        for fid in self.accessdb.keys():
481            self.auth.set_attribute(fid, 'create')
482
483    def read_mapdb(self, file):
484        """
485        Read a simple colon separated list of mappings for the
486        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
487        """
488
489        self.tbmap = { }
490        lineno =0
491        try:
492            f = open(file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if line.startswith('#') or len(line) == 0:
497                    continue
498                try:
499                    label, url = line.split(':', 1)
500                    self.tbmap[label] = url
501                except ValueError, e:
502                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
503                            "map db: %s %s" % (lineno, line, e))
504        except IOError, e:
505            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
506                    "open %s: %s" % (file, e))
507        f.close()
508
509    class emulab_segment:
510        def __init__(self, log=None, keyfile=None, debug=False):
511            self.log = log or logging.getLogger(\
512                    'fedd.experiment_control.emulab_segment')
513            self.ssh_privkey_file = keyfile
514            self.debug = debug
515            self.ssh_exec="/usr/bin/ssh"
516            self.scp_exec = "/usr/bin/scp"
517            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
518
519        def scp_file(self, file, user, host, dest=""):
520            """
521            scp a file to the remote host.  If debug is set the action is only
522            logged.
523            """
524
525            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
526                    '-o', 'StrictHostKeyChecking yes', '-i', 
527                    self.ssh_privkey_file, file, 
528                    "%s@%s:%s" % (user, host, dest)]
529            rv = 0
530
531            try:
532                dnull = open("/dev/null", "w")
533            except IOError:
534                self.log.debug("[ssh_file]: failed to open " + \
535                        "/dev/null for redirect")
536                dnull = Null
537
538            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
539            if not self.debug:
540                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
541                        close_fds=True)
542
543            return rv == 0
544
545        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
546            """
547            Run a remote command on host as user.  If debug is set, the action
548            is only logged.  Commands are run without stdin, to avoid stray
549            SIGTTINs.
550            """
551            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
552                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
553                    (self.ssh_exec, self.ssh_privkey_file, 
554                            user, host, cmd)
555
556            try:
557                dnull = open("/dev/null", "w")
558            except IOError:
559                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
560                        "for redirect")
561                dnull = Null
562
563            self.log.debug("[ssh_cmd]: %s" % sh_str)
564            if not self.debug:
565                if dnull:
566                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
567                            close_fds=True)
568                else:
569                    sub = Popen(sh_str, shell=True,
570                            close_fds=True)
571                if timeout:
572                    i = 0
573                    rv = sub.poll()
574                    while i < timeout:
575                        if rv is not None: break
576                        else:
577                            time.sleep(1)
578                            rv = sub.poll()
579                            i += 1
580                    else:
581                        self.log.debug("Process exceeded runtime: %s" % sh_str)
582                        os.kill(sub.pid, signal.SIGKILL)
583                        raise self.ssh_cmd_timeout();
584                    return rv == 0
585                else:
586                    return sub.wait() == 0
587            else:
588                if timeout == 0:
589                    self.log.debug("debug timeout raised on %s " % sh_str)
590                    raise self.ssh_cmd_timeout()
591                else:
592                    return True
593
594    class start_segment(emulab_segment):
595        def __init__(self, log=None, keyfile=None, debug=False):
596            experiment_control_local.emulab_segment.__init__(self,
597                    log=log, keyfile=keyfile, debug=debug)
598
599        def create_config_tree(self, src_dir, dest_dir, script):
600            """
601            Append commands to script that will create the directory hierarchy
602            on the remote federant.
603            """
604
605            if os.path.isdir(src_dir):
606                print >>script, "mkdir -p %s" % dest_dir
607                print >>script, "chmod 770 %s" % dest_dir
608
609                for f in os.listdir(src_dir):
610                    if os.path.isdir(f):
611                        self.create_config_tree("%s/%s" % (src_dir, f), 
612                                "%s/%s" % (dest_dir, f), script)
613            else:
614                self.log.debug("[create_config_tree]: Not a directory: %s" \
615                        % src_dir)
616
617        def ship_configs(self, host, user, src_dir, dest_dir):
618            """
619            Copy federant-specific configuration files to the federant.
620            """
621            for f in os.listdir(src_dir):
622                if os.path.isdir(f):
623                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
624                            "%s/%s" % (dest_dir, f)):
625                        return False
626                else:
627                    if not self.scp_file("%s/%s" % (src_dir, f), 
628                            user, host, dest_dir):
629                        return False
630            return True
631
632        def get_state(self, user, host, tb, pid, eid):
633            # command to test experiment state
634            expinfo_exec = "/usr/testbed/bin/expinfo" 
635            # Regular expressions to parse the expinfo response
636            state_re = re.compile("State:\s+(\w+)")
637            no_exp_re = re.compile("^No\s+such\s+experiment")
638            swapping_re = re.compile("^No\s+information\s+available.")
639            state = None    # Experiment state parsed from expinfo
640            # The expinfo ssh command.  Note the identity restriction to use
641            # only the identity provided in the pubkey given.
642            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
643                    'StrictHostKeyChecking yes', '-i', 
644                    self.ssh_privkey_file, "%s@%s" % (user, host), 
645                    expinfo_exec, pid, eid]
646
647            dev_null = None
648            try:
649                dev_null = open("/dev/null", "a")
650            except IOError, e:
651                self.log.error("[get_state]: can't open /dev/null: %s" %e)
652
653            if self.debug:
654                state = 'swapped'
655                rv = 0
656            else:
657                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
658                        close_fds=True)
659                for line in status.stdout:
660                    m = state_re.match(line)
661                    if m: state = m.group(1)
662                    else:
663                        for reg, st in ((no_exp_re, "none"),
664                                (swapping_re, "swapping")):
665                            m = reg.match(line)
666                            if m: state = st
667                rv = status.wait()
668
669            # If the experiment is not present the subcommand returns a
670            # non-zero return value.  If we successfully parsed a "none"
671            # outcome, ignore the return code.
672            if rv != 0 and state != 'none':
673                raise service_error(service_error.internal,
674                        "Cannot get status of segment %s:%s/%s" % \
675                                (tb, pid, eid))
676            elif state not in ('active', 'swapped', 'swapping', 'none'):
677                raise service_error(service_error.internal,
678                        "Cannot get status of segment %s:%s/%s" % \
679                                (tb, pid, eid))
680            else: return state
681
682
683        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
684            """
685            Start a sub-experiment on a federant.
686
687            Get the current state, modify or create as appropriate, ship data
688            and configs and start the experiment.  There are small ordering
689            differences based on the initial state of the sub-experiment.
690            """
691            # ops node in the federant
692            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
693            user = tbparams[tb]['user']     # federant user
694            pid = tbparams[tb]['project']   # federant project
695            # XXX
696            base_confs = ( "hosts",)
697            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
698            # Configuration directories on the remote machine
699            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
700            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
701            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
702
703            state = self.get_state(user, host, tb, pid, eid)
704
705            self.log.debug("[start_segment]: %s: %s" % (tb, state))
706            self.log.info("[start_segment]:transferring experiment to %s" % tb)
707
708            if not self.scp_file("%s/%s/%s" % \
709                    (tmpdir, tb, tclfile), user, host):
710                return False
711           
712            if state == 'none':
713                # Create a null copy of the experiment so that we capture any
714                # logs there if the modify fails.  Emulab software discards the
715                # logs from a failed startexp
716                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
717                    return False
718                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
719                timedout = False
720                try:
721                    if not self.ssh_cmd(user, host,
722                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
723                            "-e %s null.tcl") % (pid, eid), "startexp",
724                            timeout=60 * 10):
725                        return False
726                except self.ssh_cmd_timeout:
727                    timedout = True
728
729                if timedout:
730                    state = self.get_state(user, host, tb, pid, eid)
731                    if state != "swapped":
732                        return False
733
734           
735            # Open up a temporary file to contain a script for setting up the
736            # filespace for the new experiment.
737            self.log.info("[start_segment]: creating script file")
738            try:
739                sf, scriptname = tempfile.mkstemp()
740                scriptfile = os.fdopen(sf, 'w')
741            except IOError:
742                return False
743
744            scriptbase = os.path.basename(scriptname)
745
746            # Script the filesystem changes
747            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
748            # Clear and create the tarfiles and rpm directories
749            for d in (tarfiles_dir, rpms_dir):
750                print >>scriptfile, "/bin/rm -rf %s/*" % d
751                print >>scriptfile, "mkdir -p %s" % d
752            print >>scriptfile, 'mkdir -p %s' % proj_dir
753            self.create_config_tree("%s/%s" % (tmpdir, tb),
754                    proj_dir, scriptfile)
755            if os.path.isdir("%s/tarfiles" % tmpdir):
756                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
757                        scriptfile)
758            if os.path.isdir("%s/rpms" % tmpdir):
759                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
760                        scriptfile)
761            print >>scriptfile, "rm -f %s" % scriptbase
762            scriptfile.close()
763
764            # Move the script to the remote machine
765            # XXX: could collide tempfile names on the remote host
766            if self.scp_file(scriptname, user, host, scriptbase):
767                os.remove(scriptname)
768            else:
769                return False
770
771            # Execute the script (and the script's last line deletes it)
772            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
773                return False
774
775            for f in base_confs:
776                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
777                        "%s/%s" % (proj_dir, f)):
778                    return False
779            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
780                    proj_dir):
781                return False
782            if os.path.isdir("%s/tarfiles" % tmpdir):
783                if not self.ship_configs(host, user,
784                        "%s/tarfiles" % tmpdir, tarfiles_dir):
785                    return False
786            if os.path.isdir("%s/rpms" % tmpdir):
787                if not self.ship_configs(host, user,
788                        "%s/rpms" % tmpdir, tarfiles_dir):
789                    return False
790            # Stage the new configuration (active experiments will stay swapped
791            # in now)
792            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
793            try:
794                if not self.ssh_cmd(user, host,
795                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
796                                (pid, eid, tclfile),
797                        "modexp", timeout= 60 * 10):
798                    return False
799            except self.ssh_cmd_timeout:
800                self.log.error("Modify command failed to complete in time")
801                # There's really no way to see if this succeeded or failed, so
802                # if it hangs, assume the worst.
803                return False
804            # Active experiments are still swapped, this swaps the others in.
805            if state != 'active':
806                self.log.info("[start_segment]: Swapping %s in on %s" % \
807                        (eid, tb))
808                timedout = False
809                try:
810                    if not self.ssh_cmd(user, host,
811                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
812                            "swapexp", timeout=10*60):
813                        return False
814                except self.ssh_cmd_timeout:
815                    timedout = True
816               
817                # If the command was terminated, but completed successfully,
818                # report success.
819                if timedout:
820                    self.log.debug("[start_segment]: swapin timed out " +\
821                            "checking state")
822                    state = self.get_state(user, host, tb, pid, eid)
823                    self.log.debug("[start_segment]: state is %s" % state)
824                    return state == 'active'
825            # Everything has gone OK.
826            return True
827
828    class stop_segment(emulab_segment):
829        def __init__(self, log=None, keyfile=None, debug=False):
830            experiment_control_local.emulab_segment.__init__(self,
831                    log=log, keyfile=keyfile, debug=debug)
832
833        def __call__(self, tb, eid, tbparams):
834            """
835            Stop a sub experiment by calling swapexp on the federant
836            """
837            user = tbparams[tb]['user']
838            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
839            pid = tbparams[tb]['project']
840
841            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
842            rv = False
843            try:
844                # Clean out tar files: we've gone over quota in the past
845                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
846                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
847                        (pid, eid))
848                rv = self.ssh_cmd(user, host,
849                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
850            except self.ssh_cmd_timeout:
851                rv = False
852            return rv
853
854       
855    def generate_ssh_keys(self, dest, type="rsa" ):
856        """
857        Generate a set of keys for the gateways to use to talk.
858
859        Keys are of type type and are stored in the required dest file.
860        """
861        valid_types = ("rsa", "dsa")
862        t = type.lower();
863        if t not in valid_types: raise ValueError
864        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
865
866        try:
867            trace = open("/dev/null", "w")
868        except IOError:
869            raise service_error(service_error.internal,
870                    "Cannot open /dev/null??");
871
872        # May raise CalledProcessError
873        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
874        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
875        if rv != 0:
876            raise service_error(service_error.internal, 
877                    "Cannot generate nonce ssh keys.  %s return code %d" \
878                            % (self.ssh_keygen, rv))
879
880    def gentopo(self, str):
881        """
882        Generate the topology dtat structure from the splitter's XML
883        representation of it.
884
885        The topology XML looks like:
886            <experiment>
887                <nodes>
888                    <node><vname></vname><ips>ip1:ip2</ips></node>
889                </nodes>
890                <lans>
891                    <lan>
892                        <vname></vname><vnode></vnode><ip></ip>
893                        <bandwidth></bandwidth><member>node:port</member>
894                    </lan>
895                </lans>
896        """
897        class topo_parse:
898            """
899            Parse the topology XML and create the dats structure.
900            """
901            def __init__(self):
902                # Typing of the subelements for data conversion
903                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
904                self.int_subelements = ( 'bandwidth',)
905                self.float_subelements = ( 'delay',)
906                # The final data structure
907                self.nodes = [ ]
908                self.lans =  [ ]
909                self.topo = { \
910                        'node': self.nodes,\
911                        'lan' : self.lans,\
912                    }
913                self.element = { }  # Current element being created
914                self.chars = ""     # Last text seen
915
916            def end_element(self, name):
917                # After each sub element the contents is added to the current
918                # element or to the appropriate list.
919                if name == 'node':
920                    self.nodes.append(self.element)
921                    self.element = { }
922                elif name == 'lan':
923                    self.lans.append(self.element)
924                    self.element = { }
925                elif name in self.str_subelements:
926                    self.element[name] = self.chars
927                    self.chars = ""
928                elif name in self.int_subelements:
929                    self.element[name] = int(self.chars)
930                    self.chars = ""
931                elif name in self.float_subelements:
932                    self.element[name] = float(self.chars)
933                    self.chars = ""
934
935            def found_chars(self, data):
936                self.chars += data.rstrip()
937
938
939        tp = topo_parse();
940        parser = xml.parsers.expat.ParserCreate()
941        parser.EndElementHandler = tp.end_element
942        parser.CharacterDataHandler = tp.found_chars
943
944        parser.Parse(str)
945
946        return tp.topo
947       
948
949    def genviz(self, topo):
950        """
951        Generate the visualization the virtual topology
952        """
953
954        neato = "/usr/local/bin/neato"
955        # These are used to parse neato output and to create the visualization
956        # file.
957        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
958        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
959                "%s</type></node>"
960
961        try:
962            # Node names
963            nodes = [ n['vname'] for n in topo['node'] ]
964            topo_lans = topo['lan']
965        except KeyError:
966            raise service_error(service_error.internal, "Bad topology")
967
968        lans = { }
969        links = { }
970
971        # Walk through the virtual topology, organizing the connections into
972        # 2-node connections (links) and more-than-2-node connections (lans).
973        # When a lan is created, it's added to the list of nodes (there's a
974        # node in the visualization for the lan).
975        for l in topo_lans:
976            if links.has_key(l['vname']):
977                if len(links[l['vname']]) < 2:
978                    links[l['vname']].append(l['vnode'])
979                else:
980                    nodes.append(l['vname'])
981                    lans[l['vname']] = links[l['vname']]
982                    del links[l['vname']]
983                    lans[l['vname']].append(l['vnode'])
984            elif lans.has_key(l['vname']):
985                lans[l['vname']].append(l['vnode'])
986            else:
987                links[l['vname']] = [ l['vnode'] ]
988
989
990        # Open up a temporary file for dot to turn into a visualization
991        try:
992            df, dotname = tempfile.mkstemp()
993            dotfile = os.fdopen(df, 'w')
994        except IOError:
995            raise service_error(service_error.internal,
996                    "Failed to open file in genviz")
997
998        try:
999            dnull = open('/dev/null', 'w')
1000        except IOError:
1001            service_error(service_error.internal,
1002                    "Failed to open /dev/null in genviz")
1003
1004        # Generate a dot/neato input file from the links, nodes and lans
1005        try:
1006            print >>dotfile, "graph G {"
1007            for n in nodes:
1008                print >>dotfile, '\t"%s"' % n
1009            for l in links.keys():
1010                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
1011            for l in lans.keys():
1012                for n in lans[l]:
1013                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
1014            print >>dotfile, "}"
1015            dotfile.close()
1016        except TypeError:
1017            raise service_error(service_error.internal,
1018                    "Single endpoint link in vtopo")
1019        except IOError:
1020            raise service_error(service_error.internal, "Cannot write dot file")
1021
1022        # Use dot to create a visualization
1023        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1024                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
1025                close_fds=True)
1026        dnull.close()
1027
1028        # Translate dot to vis format
1029        vis_nodes = [ ]
1030        vis = { 'node': vis_nodes }
1031        for line in dot.stdout:
1032            m = vis_re.match(line)
1033            if m:
1034                vn = m.group(1)
1035                vis_node = {'name': vn, \
1036                        'x': float(m.group(2)),\
1037                        'y' : float(m.group(3)),\
1038                    }
1039                if vn in links.keys() or vn in lans.keys():
1040                    vis_node['type'] = 'lan'
1041                else:
1042                    vis_node['type'] = 'node'
1043                vis_nodes.append(vis_node)
1044        rv = dot.wait()
1045
1046        os.remove(dotname)
1047        if rv == 0 : return vis
1048        else: return None
1049
1050    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1051            access_user):
1052        """
1053        Get access to testbed through fedd and set the parameters for that tb
1054        """
1055        uri = self.tbmap.get(tb, None)
1056        if not uri:
1057            raise service_error(serice_error.server_config, 
1058                    "Unknown testbed: %s" % tb)
1059
1060        # currently this lumps all users into one service access group
1061        service_keys = [ a for u in user \
1062                for a in u.get('access', []) \
1063                    if a.has_key('sshPubkey')]
1064
1065        if len(service_keys) == 0:
1066            raise service_error(service_error.req, 
1067                    "Must have at least one SSH pubkey for services")
1068
1069
1070        for p, u in access_user:
1071            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1072                    "to %s") %  ((p or "None"), u, uri))
1073
1074            if p:
1075                # Request with user and project specified
1076                req = {\
1077                        'destinationTestbed' : { 'uri' : uri },
1078                        'project': { 
1079                            'name': {'localname': p},
1080                            'user': [ {'userID': { 'localname': u } } ],
1081                            },
1082                        'user':  user,
1083                        'allocID' : { 'localname': 'test' },
1084                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1085                        'serviceAccess' : service_keys
1086                    }
1087            else:
1088                # Request with only user specified
1089                req = {\
1090                        'destinationTestbed' : { 'uri' : uri },
1091                        'user':  [ {'userID': { 'localname': u } } ],
1092                        'allocID' : { 'localname': 'test' },
1093                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1094                        'serviceAccess' : service_keys
1095                    }
1096
1097            if tb == master:
1098                # NB, the export_project parameter is a dict that includes
1099                # the type
1100                req['exportProject'] = export_project
1101
1102            # node resources if any
1103            if nodes != None and len(nodes) > 0:
1104                rnodes = [ ]
1105                for n in nodes:
1106                    rn = { }
1107                    image, hw, count = n.split(":")
1108                    if image: rn['image'] = [ image ]
1109                    if hw: rn['hardware'] = [ hw ]
1110                    if count and int(count) >0 : rn['count'] = int(count)
1111                    rnodes.append(rn)
1112                req['resources']= { }
1113                req['resources']['node'] = rnodes
1114
1115            try:
1116                if self.local_access.has_key(uri):
1117                    # Local access call
1118                    req = { 'RequestAccessRequestBody' : req }
1119                    r = self.local_access[uri].RequestAccess(req, 
1120                            fedid(file=self.cert_file))
1121                    r = { 'RequestAccessResponseBody' : r }
1122                else:
1123                    r = self.call_RequestAccess(uri, req, 
1124                            self.cert_file, self.cert_pwd, self.trusted_certs)
1125            except service_error, e:
1126                if e.code == service_error.access:
1127                    self.log.debug("[get_access] Access denied")
1128                    r = None
1129                    continue
1130                else:
1131                    raise e
1132
1133            if r.has_key('RequestAccessResponseBody'):
1134                # Through to here we have a valid response, not a fault.
1135                # Access denied is a fault, so something better or worse than
1136                # access denied has happened.
1137                r = r['RequestAccessResponseBody']
1138                self.log.debug("[get_access] Access granted")
1139                break
1140            else:
1141                raise service_error(service_error.protocol,
1142                        "Bad proxy response")
1143       
1144        if not r:
1145            raise service_error(service_error.access, 
1146                    "Access denied by %s (%s)" % (tb, uri))
1147
1148        e = r['emulab']
1149        p = e['project']
1150        tbparam[tb] = { 
1151                "boss": e['boss'],
1152                "host": e['ops'],
1153                "domain": e['domain'],
1154                "fs": e['fileServer'],
1155                "eventserver": e['eventServer'],
1156                "project": unpack_id(p['name']),
1157                "emulab" : e,
1158                "allocID" : r['allocID'],
1159                }
1160        # Make the testbed name be the label the user applied
1161        p['testbed'] = {'localname': tb }
1162
1163        for u in p['user']:
1164            role = u.get('role', None)
1165            if role == 'experimentCreation':
1166                tbparam[tb]['user'] = unpack_id(u['userID'])
1167                break
1168        else:
1169            raise service_error(service_error.internal, 
1170                    "No createExperimentUser from %s" %tb)
1171
1172        # Add attributes to barameter space.  We don't allow attributes to
1173        # overlay any parameters already installed.
1174        for a in e['fedAttr']:
1175            try:
1176                if a['attribute'] and isinstance(a['attribute'], basestring)\
1177                        and not tbparam[tb].has_key(a['attribute'].lower()):
1178                    tbparam[tb][a['attribute'].lower()] = a['value']
1179            except KeyError:
1180                self.log.error("Bad attribute in response: %s" % a)
1181       
1182    def release_access(self, tb, aid):
1183        """
1184        Release access to testbed through fedd
1185        """
1186
1187        uri = self.tbmap.get(tb, None)
1188        if not uri:
1189            raise service_error(serice_error.server_config, 
1190                    "Unknown testbed: %s" % tb)
1191
1192        if self.local_access.has_key(uri):
1193            resp = self.local_access[uri].ReleaseAccess(\
1194                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1195                    fedid(file=self.cert_file))
1196            resp = { 'ReleaseAccessResponseBody': resp } 
1197        else:
1198            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1199                    self.cert_file, self.cert_pwd, self.trusted_certs)
1200
1201        # better error coding
1202
1203    def remote_splitter(self, uri, desc, master):
1204
1205        req = {
1206                'description' : { 'ns2description': desc },
1207                'master': master,
1208                'include_fedkit': bool(self.fedkit),
1209                'include_gatewaykit': bool(self.gatewaykit)
1210            }
1211
1212        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1213                self.trusted_certs)
1214
1215        if r.has_key('Ns2SplitResponseBody'):
1216            r = r['Ns2SplitResponseBody']
1217            if r.has_key('output'):
1218                return r['output'].splitlines()
1219            else:
1220                raise service_error(service_error.protocol, 
1221                        "Bad splitter response (no output)")
1222        else:
1223            raise service_error(service_error.protocol, "Bad splitter response")
1224       
1225    class current_testbed:
1226        """
1227        Object for collecting the current testbed description.  The testbed
1228        description is saved to a file with the local testbed variables
1229        subsittuted line by line.
1230        """
1231        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1232            def tar_list_to_string(tl):
1233                if tl is None: return None
1234
1235                rv = ""
1236                for t in tl:
1237                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1238                            (t[0], os.path.basename(t[1]))
1239                return rv
1240
1241
1242            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1243            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1244            self.current_testbed = None
1245            self.testbed_file = None
1246
1247            self.def_expstart = \
1248                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1249            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1250            self.def_gwstart = \
1251                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1252            self.def_mgwstart = \
1253                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1254            self.def_gwimage = "FBSD61-TUNNEL2";
1255            self.def_gwtype = "pc";
1256            self.def_mgwcmd = '# '
1257            self.def_mgwcmdparams = ''
1258            self.def_gwcmd = '# '
1259            self.def_gwcmdparams = ''
1260
1261            self.eid = eid
1262            self.tmpdir = tmpdir
1263            # Convert fedkit and gateway kit (which are lists of tuples) into a
1264            # substituition string.
1265            self.fedkit = tar_list_to_string(fedkit)
1266            self.gatewaykit = tar_list_to_string(gatewaykit)
1267
1268        def __call__(self, line, master, allocated, tbparams):
1269            # Capture testbed topology descriptions
1270            if self.current_testbed == None:
1271                m = self.begin_testbed.match(line)
1272                if m != None:
1273                    self.current_testbed = m.group(1)
1274                    if self.current_testbed == None:
1275                        raise service_error(service_error.req,
1276                                "Bad request format (unnamed testbed)")
1277                    allocated[self.current_testbed] = \
1278                            allocated.get(self.current_testbed,0) + 1
1279                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1280                    if not os.path.exists(tb_dir):
1281                        try:
1282                            os.mkdir(tb_dir)
1283                        except IOError:
1284                            raise service_error(service_error.internal,
1285                                    "Cannot create %s" % tb_dir)
1286                    try:
1287                        self.testbed_file = open("%s/%s.%s.tcl" %
1288                                (tb_dir, self.eid, self.current_testbed), 'w')
1289                    except IOError:
1290                        self.testbed_file = None
1291                    return True
1292                else: return False
1293            else:
1294                m = self.end_testbed.match(line)
1295                if m != None:
1296                    if m.group(1) != self.current_testbed:
1297                        raise service_error(service_error.internal, 
1298                                "Mismatched testbed markers!?")
1299                    if self.testbed_file != None: 
1300                        self.testbed_file.close()
1301                        self.testbed_file = None
1302                    self.current_testbed = None
1303                elif self.testbed_file:
1304                    # Substitute variables and put the line into the local
1305                    # testbed file.
1306                    gwtype = tbparams[self.current_testbed].get(\
1307                            'connectortype', self.def_gwtype)
1308                    gwimage = tbparams[self.current_testbed].get(\
1309                            'connectorimage', self.def_gwimage)
1310                    mgwstart = tbparams[self.current_testbed].get(\
1311                            'masterconnectorstartcmd', self.def_mgwstart)
1312                    mexpstart = tbparams[self.current_testbed].get(\
1313                            'masternodestartcmd', self.def_mexpstart)
1314                    gwstart = tbparams[self.current_testbed].get(\
1315                            'slaveconnectorstartcmd', self.def_gwstart)
1316                    expstart = tbparams[self.current_testbed].get(\
1317                            'slavenodestartcmd', self.def_expstart)
1318                    project = tbparams[self.current_testbed].get('project')
1319                    gwcmd = tbparams[self.current_testbed].get(\
1320                            'slaveconnectorcmd', self.def_gwcmd)
1321                    gwcmdparams = tbparams[self.current_testbed].get(\
1322                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1323                    mgwcmd = tbparams[self.current_testbed].get(\
1324                            'masterconnectorcmd', self.def_gwcmd)
1325                    mgwcmdparams = tbparams[self.current_testbed].get(\
1326                            'masterconnectorcmdparams', self.def_gwcmdparams)
1327                    line = re.sub("GWTYPE", gwtype, line)
1328                    line = re.sub("GWIMAGE", gwimage, line)
1329                    if self.current_testbed == master:
1330                        line = re.sub("GWSTART", mgwstart, line)
1331                        line = re.sub("EXPSTART", mexpstart, line)
1332                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1333                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1334                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1335                    else:
1336                        line = re.sub("GWSTART", gwstart, line)
1337                        line = re.sub("EXPSTART", expstart, line)
1338                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1339                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1340                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1341                    #These expansions contain EID and PROJDIR.  NB these are
1342                    # local fedkit and gatewaykit, which are strings.
1343                    if self.fedkit:
1344                        line = re.sub("FEDKIT", self.fedkit, line)
1345                    if self.gatewaykit:
1346                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1347                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1348                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1349                    line = re.sub("EID", self.eid, line)
1350                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1351                            (project, self.eid), line)
1352                    print >>self.testbed_file, line
1353                return True
1354
1355    class allbeds:
1356        """
1357        Process the Allbeds section.  Get access to each federant and save the
1358        parameters in tbparams
1359        """
1360        def __init__(self, get_access):
1361            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1362            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1363            self.in_allbeds = False
1364            self.get_access = get_access
1365
1366        def __call__(self, line, user, tbparams, master, export_project,
1367                access_user):
1368            # Testbed access parameters
1369            if not self.in_allbeds:
1370                if self.begin_allbeds.match(line):
1371                    self.in_allbeds = True
1372                    return True
1373                else:
1374                    return False
1375            else:
1376                if self.end_allbeds.match(line):
1377                    self.in_allbeds = False
1378                else:
1379                    nodes = line.split('|')
1380                    tb = nodes.pop(0)
1381                    self.get_access(tb, nodes, user, tbparams, master,
1382                            export_project, access_user)
1383                return True
1384
1385    class gateways:
1386        def __init__(self, eid, master, tmpdir, gw_pubkey,
1387                gw_secretkey, copy_file, fedkit):
1388            self.begin_gateways = \
1389                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1390            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1391            self.current_gateways = None
1392            self.control_gateway = None
1393            self.active_end = { }
1394
1395            self.eid = eid
1396            self.master = master
1397            self.tmpdir = tmpdir
1398            self.gw_pubkey_base = gw_pubkey
1399            self.gw_secretkey_base = gw_secretkey
1400
1401            self.copy_file = copy_file
1402            self.fedkit = fedkit
1403
1404
1405        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1406                active_end, tbparams, dtb, myname, desthost, type):
1407            """
1408            Produce a gateway configuration file from a gateways line.
1409            """
1410
1411            sproject = tbparams[gw].get('project', 'project')
1412            dproject = tbparams[dtb].get('project', 'project')
1413            sdomain = ".%s.%s%s" % (eid, sproject,
1414                    tbparams[gw].get('domain', ".example.com"))
1415            ddomain = ".%s.%s%s" % (eid, dproject,
1416                    tbparams[dtb].get('domain', ".example.com"))
1417            boss = tbparams[master].get('boss', "boss")
1418            fs = tbparams[master].get('fs', "fs")
1419            event_server = "%s%s" % \
1420                    (tbparams[gw].get('eventserver', "event_server"),
1421                            tbparams[gw].get('domain', "example.com"))
1422            remote_event_server = "%s%s" % \
1423                    (tbparams[dtb].get('eventserver', "event_server"),
1424                            tbparams[dtb].get('domain', "example.com"))
1425            seer_control = "%s%s" % \
1426                    (tbparams[gw].get('control', "control"), sdomain)
1427            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1428
1429            if self.fedkit:
1430                remote_script_dir = "/usr/local/federation/bin"
1431                local_script_dir = "/usr/local/federation/bin"
1432            else:
1433                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1434                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1435
1436            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1437            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1438            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1439
1440            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1441            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1442
1443            # translate to lower case so the `hostname` hack for specifying
1444            # configuration files works.
1445            conf_file = conf_file.lower();
1446            remote_conf_file = remote_conf_file.lower();
1447
1448            if dtb == master:
1449                active = "false"
1450            elif gw == master:
1451                active = "true"
1452            elif active_end.has_key('%s-%s' % (dtb, gw)):
1453                active = "false"
1454            else:
1455                active_end['%s-%s' % (gw, dtb)] = 1
1456                active = "true"
1457
1458            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1459            print >>gwconfig, "Active: %s" % active
1460            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1461            if tunnel_iface:
1462                print >>gwconfig, "Interface: %s" % tunnel_iface
1463            print >>gwconfig, "BossName: %s" % boss
1464            print >>gwconfig, "FsName: %s" % fs
1465            print >>gwconfig, "EventServerName: %s" % event_server
1466            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1467            print >>gwconfig, "SeerControl: %s" % seer_control
1468            print >>gwconfig, "Type: %s" % type
1469            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1470            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1471                    local_script_dir
1472            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1473            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1474            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1475                    (remote_conf_dir, remote_conf_file)
1476            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1477            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1478            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1479            gwconfig.close()
1480
1481            return active == "true"
1482
1483        def __call__(self, line, allocated, tbparams):
1484            # Process gateways
1485            if not self.current_gateways:
1486                m = self.begin_gateways.match(line)
1487                if m:
1488                    self.current_gateways = m.group(1)
1489                    if allocated.has_key(self.current_gateways):
1490                        # This test should always succeed
1491                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1492                        if not os.path.exists(tb_dir):
1493                            try:
1494                                os.mkdir(tb_dir)
1495                            except IOError:
1496                                raise service_error(service_error.internal,
1497                                        "Cannot create %s" % tb_dir)
1498                    else:
1499                        # XXX
1500                        self.log.error("[gateways]: Ignoring gateways for " + \
1501                                "unknown testbed %s" % self.current_gateways)
1502                        self.current_gateways = None
1503                    return True
1504                else:
1505                    return False
1506            else:
1507                m = self.end_gateways.match(line)
1508                if m :
1509                    if m.group(1) != self.current_gateways:
1510                        raise service_error(service_error.internal,
1511                                "Mismatched gateway markers!?")
1512                    if self.control_gateway:
1513                        try:
1514                            cc = open("%s/%s/client.conf" %
1515                                    (self.tmpdir, self.current_gateways), 'w')
1516                            print >>cc, "ControlGateway: %s" % \
1517                                    self.control_gateway
1518                            if tbparams[self.master].has_key('smbshare'):
1519                                print >>cc, "SMBSHare: %s" % \
1520                                        tbparams[self.master]['smbshare']
1521                            print >>cc, "ProjectUser: %s" % \
1522                                    tbparams[self.master]['user']
1523                            print >>cc, "ProjectName: %s" % \
1524                                    tbparams[self.master]['project']
1525                            print >>cc, "ExperimentID: %s/%s" % \
1526                                    ( tbparams[self.master]['project'], \
1527                                    self.eid )
1528                            cc.close()
1529                        except IOError:
1530                            raise service_error(service_error.internal,
1531                                    "Error creating client config")
1532                        # XXX: This seer specific file should disappear
1533                        try:
1534                            cc = open("%s/%s/seer.conf" %
1535                                    (self.tmpdir, self.current_gateways),
1536                                    'w')
1537                            if self.current_gateways != self.master:
1538                                print >>cc, "ControlNode: %s" % \
1539                                        self.control_gateway
1540                            print >>cc, "ExperimentID: %s/%s" % \
1541                                    ( tbparams[self.master]['project'], \
1542                                    self.eid )
1543                            cc.close()
1544                        except IOError:
1545                            raise service_error(service_error.internal,
1546                                    "Error creating seer config")
1547                    else:
1548                        debug.error("[gateways]: No control gateway for %s" %\
1549                                    self.current_gateways)
1550                    self.current_gateways = None
1551                else:
1552                    dtb, myname, desthost, type = line.split(" ")
1553
1554                    if type == "control" or type == "both":
1555                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1556                                self.eid, 
1557                                tbparams[self.current_gateways]['project'],
1558                                tbparams[self.current_gateways]['domain'])
1559                    try:
1560                        active = self.gateway_conf_file(self.current_gateways,
1561                                self.master, self.eid, self.gw_pubkey_base,
1562                                self.gw_secretkey_base,
1563                                self.active_end, tbparams, dtb, myname,
1564                                desthost, type)
1565                    except IOError, e:
1566                        raise service_error(service_error.internal,
1567                                "Failed to write config file for %s" % \
1568                                        self.current_gateway)
1569           
1570                    gw_pubkey = "%s/keys/%s" % \
1571                            (self.tmpdir, self.gw_pubkey_base)
1572                    gw_secretkey = "%s/keys/%s" % \
1573                            (self.tmpdir, self.gw_secretkey_base)
1574
1575                    pkfile = "%s/%s/%s" % \
1576                            ( self.tmpdir, self.current_gateways, 
1577                                    self.gw_pubkey_base)
1578                    skfile = "%s/%s/%s" % \
1579                            ( self.tmpdir, self.current_gateways, 
1580                                    self.gw_secretkey_base)
1581
1582                    if not os.path.exists(pkfile):
1583                        try:
1584                            self.copy_file(gw_pubkey, pkfile)
1585                        except IOError:
1586                            service_error(service_error.internal,
1587                                    "Failed to copy pubkey file")
1588
1589                    if active and not os.path.exists(skfile):
1590                        try:
1591                            self.copy_file(gw_secretkey, skfile)
1592                        except IOError:
1593                            service_error(service_error.internal,
1594                                    "Failed to copy secretkey file")
1595                return True
1596
1597    class shunt_to_file:
1598        """
1599        Simple class to write data between two regexps to a file.
1600        """
1601        def __init__(self, begin, end, filename):
1602            """
1603            Begin shunting on a match of begin, stop on end, send data to
1604            filename.
1605            """
1606            self.begin = re.compile(begin)
1607            self.end = re.compile(end)
1608            self.in_shunt = False
1609            self.file = None
1610            self.filename = filename
1611
1612        def __call__(self, line):
1613            """
1614            Call this on each line in the input that may be shunted.
1615            """
1616            if not self.in_shunt:
1617                if self.begin.match(line):
1618                    self.in_shunt = True
1619                    try:
1620                        self.file = open(self.filename, "w")
1621                    except:
1622                        self.file = None
1623                        raise
1624                    return True
1625                else:
1626                    return False
1627            else:
1628                if self.end.match(line):
1629                    if self.file: 
1630                        self.file.close()
1631                        self.file = None
1632                    self.in_shunt = False
1633                else:
1634                    if self.file:
1635                        print >>self.file, line
1636                return True
1637
1638    class shunt_to_list:
1639        """
1640        Same interface as shunt_to_file.  Data collected in self.list, one list
1641        element per line.
1642        """
1643        def __init__(self, begin, end):
1644            self.begin = re.compile(begin)
1645            self.end = re.compile(end)
1646            self.in_shunt = False
1647            self.list = [ ]
1648       
1649        def __call__(self, line):
1650            if not self.in_shunt:
1651                if self.begin.match(line):
1652                    self.in_shunt = True
1653                    return True
1654                else:
1655                    return False
1656            else:
1657                if self.end.match(line):
1658                    self.in_shunt = False
1659                else:
1660                    self.list.append(line)
1661                return True
1662
1663    class shunt_to_string:
1664        """
1665        Same interface as shunt_to_file.  Data collected in self.str, all in
1666        one string.
1667        """
1668        def __init__(self, begin, end):
1669            self.begin = re.compile(begin)
1670            self.end = re.compile(end)
1671            self.in_shunt = False
1672            self.str = ""
1673       
1674        def __call__(self, line):
1675            if not self.in_shunt:
1676                if self.begin.match(line):
1677                    self.in_shunt = True
1678                    return True
1679                else:
1680                    return False
1681            else:
1682                if self.end.match(line):
1683                    self.in_shunt = False
1684                else:
1685                    self.str += line
1686                return True
1687
1688    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1689            tbparams, tmpdir, alloc_log=None):
1690        started = { }           # Testbeds where a sub-experiment started
1691                                # successfully
1692
1693        # XXX
1694        fail_soft = False
1695
1696        log = alloc_log or self.log
1697
1698        thread_pool = self.thread_pool(self.nthreads)
1699        threads = [ ]
1700
1701        for tb in [ k for k in allocated.keys() if k != master]:
1702            # Create and start a thread to start the segment, and save it to
1703            # get the return value later
1704            thread_pool.wait_for_slot()
1705            t  = self.pooled_thread(\
1706                    target=self.start_segment(log=log,
1707                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1708                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1709                    pdata=thread_pool, trace_file=self.trace_file)
1710            threads.append(t)
1711            t.start()
1712
1713        # Wait until all finish
1714        thread_pool.wait_for_all_done()
1715
1716        # If none failed, start the master
1717        failed = [ t.getName() for t in threads if not t.rv ]
1718
1719        if len(failed) == 0:
1720            starter = self.start_segment(log=log, 
1721                    keyfile=self.ssh_privkey_file, debug=self.debug)
1722            if not starter(master, eid, tbparams, tmpdir):
1723                failed.append(master)
1724
1725        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1726        # If one failed clean up, unless fail_soft is set
1727        if failed:
1728            if not fail_soft:
1729                thread_pool.clear()
1730                for tb in succeeded:
1731                    # Create and start a thread to stop the segment
1732                    thread_pool.wait_for_slot()
1733                    t  = self.pooled_thread(\
1734                            target=self.stop_segment(log=log,
1735                                keyfile=self.ssh_privkey_file,
1736                                debug=self.debug), 
1737                            args=(tb, eid, tbparams), name=tb,
1738                            pdata=thread_pool, trace_file=self.trace_file)
1739                    t.start()
1740                # Wait until all finish
1741                thread_pool.wait_for_all_done()
1742
1743                # release the allocations
1744                for tb in tbparams.keys():
1745                    self.release_access(tb, tbparams[tb]['allocID'])
1746                # Remove the placeholder
1747                self.state_lock.acquire()
1748                self.state[eid]['experimentStatus'] = 'failed'
1749                if self.state_filename: self.write_state()
1750                self.state_lock.release()
1751
1752                #raise service_error(service_error.federant,
1753                #    "Swap in failed on %s" % ",".join(failed))
1754                log.error("Swap in failed on %s" % ",".join(failed))
1755                return
1756        else:
1757            log.info("[start_segment]: Experiment %s active" % eid)
1758
1759        log.debug("[start_experiment]: removing %s" % tmpdir)
1760
1761        # Walk up tmpdir, deleting as we go
1762        for path, dirs, files in os.walk(tmpdir, topdown=False):
1763            for f in files:
1764                os.remove(os.path.join(path, f))
1765            for d in dirs:
1766                os.rmdir(os.path.join(path, d))
1767        os.rmdir(tmpdir)
1768
1769        # Insert the experiment into our state and update the disk copy
1770        self.state_lock.acquire()
1771        self.state[expid]['experimentStatus'] = 'active'
1772        self.state[eid] = self.state[expid]
1773        if self.state_filename: self.write_state()
1774        self.state_lock.release()
1775        return
1776
1777    def create_experiment(self, req, fid):
1778        """
1779        The external interface to experiment creation called from the
1780        dispatcher.
1781
1782        Creates a working directory, splits the incoming description using the
1783        splitter script and parses out the avrious subsections using the
1784        lcasses above.  Once each sub-experiment is created, use pooled threads
1785        to instantiate them and start it all up.
1786        """
1787
1788        if not self.auth.check_attribute(fid, 'create'):
1789            raise service_error(service_error.access, "Create access denied")
1790
1791        try:
1792            tmpdir = tempfile.mkdtemp(prefix="split-")
1793        except IOError:
1794            raise service_error(service_error.internal, "Cannot create tmp dir")
1795
1796        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1797        gw_secretkey_base = "fed.%s" % self.ssh_type
1798        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1799        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1800        tclfile = tmpdir + "/experiment.tcl"
1801        tbparams = { }
1802        try:
1803            access_user = self.accessdb[fid]
1804        except KeyError:
1805            raise service_error(service_error.internal,
1806                    "Access map and authorizer out of sync in " + \
1807                            "create_experiment for fedid %s"  % fid)
1808
1809        pid = "dummy"
1810        gid = "dummy"
1811        try:
1812            os.mkdir(tmpdir+"/keys")
1813        except OSError:
1814            raise service_error(service_error.internal,
1815                    "Can't make temporary dir")
1816
1817        req = req.get('CreateRequestBody', None)
1818        if not req:
1819            raise service_error(service_error.req,
1820                    "Bad request format (no CreateRequestBody)")
1821        # The tcl parser needs to read a file so put the content into that file
1822        descr=req.get('experimentdescription', None)
1823        if descr:
1824            file_content=descr.get('ns2description', None)
1825            if file_content:
1826                try:
1827                    f = open(tclfile, 'w')
1828                    f.write(file_content)
1829                    f.close()
1830                except IOError:
1831                    raise service_error(service_error.internal,
1832                            "Cannot write temp experiment description")
1833            else:
1834                raise service_error(service_error.req, 
1835                        "Only ns2descriptions supported")
1836        else:
1837            raise service_error(service_error.req, "No experiment description")
1838
1839        # Generate an ID for the experiment (slice) and a certificate that the
1840        # allocator can use to prove they own it.  We'll ship it back through
1841        # the encrypted connection.
1842        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1843
1844        if req.has_key('experimentID') and \
1845                req['experimentID'].has_key('localname'):
1846            overwrite = False
1847            eid = req['experimentID']['localname']
1848            # If there's an old failed experiment here with the same local name
1849            # and accessible by this user, we'll overwrite it, otherwise we'll
1850            # fall through and do the collision avoidance.
1851            old_expid = self.get_experiment_fedid(eid)
1852            if old_expid and self.check_experiment_access(fid, old_expid):
1853                self.state_lock.acquire()
1854                status = self.state[eid].get('experimentStatus', None)
1855                if status and status == 'failed':
1856                    # remove the old access attribute
1857                    self.auth.unset_attribute(fid, old_expid)
1858                    overwrite = True
1859                    del self.state[eid]
1860                    del self.state[old_expid]
1861                self.state_lock.release()
1862            self.state_lock.acquire()
1863            while (self.state.has_key(eid) and not overwrite):
1864                eid += random.choice(string.ascii_letters)
1865            # Initial state
1866            self.state[eid] = {
1867                    'experimentID' : \
1868                            [ { 'localname' : eid }, {'fedid': expid } ],
1869                    'experimentStatus': 'starting',
1870                    'experimentAccess': { 'X509' : expcert },
1871                    'owner': fid,
1872                    'log' : [],
1873                }
1874            self.state[expid] = self.state[eid]
1875            if self.state_filename: self.write_state()
1876            self.state_lock.release()
1877        else:
1878            eid = self.exp_stem
1879            for i in range(0,5):
1880                eid += random.choice(string.ascii_letters)
1881            self.state_lock.acquire()
1882            while (self.state.has_key(eid)):
1883                eid = self.exp_stem
1884                for i in range(0,5):
1885                    eid += random.choice(string.ascii_letters)
1886            # Initial state
1887            self.state[eid] = {
1888                    'experimentID' : \
1889                            [ { 'localname' : eid }, {'fedid': expid } ],
1890                    'experimentStatus': 'starting',
1891                    'experimentAccess': { 'X509' : expcert },
1892                    'owner': fid,
1893                    'log' : [],
1894                }
1895            self.state[expid] = self.state[eid]
1896            if self.state_filename: self.write_state()
1897            self.state_lock.release()
1898
1899        try: 
1900            # This catches exceptions to clear the placeholder if necessary
1901            try:
1902                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1903            except ValueError:
1904                raise service_error(service_error.server_config, 
1905                        "Bad key type (%s)" % self.ssh_type)
1906
1907            user = req.get('user', None)
1908            if user == None:
1909                raise service_error(service_error.req, "No user")
1910
1911            master = req.get('master', None)
1912            if not master:
1913                raise service_error(service_error.req,
1914                        "No master testbed label")
1915            export_project = req.get('exportProject', None)
1916            if not export_project:
1917                raise service_error(service_error.req, "No export project")
1918           
1919            if self.splitter_url:
1920                self.log.debug("Calling remote splitter at %s" % \
1921                        self.splitter_url)
1922                split_data = self.remote_splitter(self.splitter_url,
1923                        file_content, master)
1924            else:
1925                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1926                    str(self.muxmax), '-m', master]
1927
1928                if self.fedkit:
1929                    tclcmd.append('-k')
1930
1931                if self.gatewaykit:
1932                    tclcmd.append('-K')
1933
1934                tclcmd.extend([pid, gid, eid, tclfile])
1935
1936                self.log.debug("running local splitter %s", " ".join(tclcmd))
1937                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True)
1938                split_data = tclparser.stdout
1939
1940            allocated = { }         # Testbeds we can access
1941            # Objects to parse the splitter output (defined above)
1942            parse_current_testbed = self.current_testbed(eid, tmpdir,
1943                    self.fedkit, self.gatewaykit)
1944            parse_allbeds = self.allbeds(self.get_access)
1945            parse_gateways = self.gateways(eid, master, tmpdir,
1946                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1947                    self.fedkit)
1948            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1949                        "^#\s+End\s+Vtopo")
1950            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1951                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1952            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1953                    "^#\s+End\s+tarfiles")
1954            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1955                    "^#\s+End\s+rpms")
1956
1957            # Working on the split data
1958            for line in split_data:
1959                line = line.rstrip()
1960                if parse_current_testbed(line, master, allocated, tbparams):
1961                    continue
1962                elif parse_allbeds(line, user, tbparams, master, export_project,
1963                        access_user):
1964                    continue
1965                elif parse_gateways(line, allocated, tbparams):
1966                    continue
1967                elif parse_vtopo(line):
1968                    continue
1969                elif parse_hostnames(line):
1970                    continue
1971                elif parse_tarfiles(line):
1972                    continue
1973                elif parse_rpms(line):
1974                    continue
1975                else:
1976                    raise service_error(service_error.internal, 
1977                            "Bad tcl parse? %s" % line)
1978            # Virtual topology and visualization
1979            vtopo = self.gentopo(parse_vtopo.str)
1980            if not vtopo:
1981                raise service_error(service_error.internal, 
1982                        "Failed to generate virtual topology")
1983
1984            vis = self.genviz(vtopo)
1985            if not vis:
1986                raise service_error(service_error.internal, 
1987                        "Failed to generate visualization")
1988
1989           
1990            # save federant information
1991            for k in allocated.keys():
1992                tbparams[k]['federant'] = {\
1993                        'name': [ { 'localname' : eid} ],\
1994                        'emulab': tbparams[k]['emulab'],\
1995                        'allocID' : tbparams[k]['allocID'],\
1996                        'master' : k == master,\
1997                    }
1998
1999            self.state_lock.acquire()
2000            self.state[eid]['vtopo'] = vtopo
2001            self.state[eid]['vis'] = vis
2002            self.state[expid]['federant'] = \
2003                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2004                        if tbparams[tb].has_key('federant') ]
2005            if self.state_filename: self.write_state()
2006            self.state_lock.release()
2007
2008            # Copy tarfiles and rpms needed at remote sites into a staging area
2009            try:
2010                if self.fedkit:
2011                    for t in self.fedkit:
2012                        parse_tarfiles.list.append(t[1])
2013                if self.gatewaykit:
2014                    for t in self.gatewaykit:
2015                        parse_tarfiles.list.append(t[1])
2016                for t in parse_tarfiles.list:
2017                    if not os.path.exists("%s/tarfiles" % tmpdir):
2018                        os.mkdir("%s/tarfiles" % tmpdir)
2019                    self.copy_file(t, "%s/tarfiles/%s" % \
2020                            (tmpdir, os.path.basename(t)))
2021                for r in parse_rpms.list:
2022                    if not os.path.exists("%s/rpms" % tmpdir):
2023                        os.mkdir("%s/rpms" % tmpdir)
2024                    self.copy_file(r, "%s/rpms/%s" % \
2025                            (tmpdir, os.path.basename(r)))
2026                # A null experiment file in case we need to create a remote
2027                # experiment from scratch
2028                f = open("%s/null.tcl" % tmpdir, "w")
2029                print >>f, """
2030set ns [new Simulator]
2031source tb_compat.tcl
2032
2033set a [$ns node]
2034
2035$ns rtproto Session
2036$ns run
2037"""
2038                f.close()
2039
2040            except IOError, e:
2041                raise service_error(service_error.internal, 
2042                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2043
2044        except service_error, e:
2045            # If something goes wrong in the parse (usually an access error)
2046            # clear the placeholder state.  From here on out the code delays
2047            # exceptions.  Failing at this point returns a fault to the remote
2048            # caller.
2049            self.state_lock.acquire()
2050            del self.state[eid]
2051            del self.state[expid]
2052            if self.state_filename: self.write_state()
2053            self.state_lock.release()
2054            raise e
2055
2056
2057        # Start the background swapper and return the starting state.  From
2058        # here on out, the state will stick around a while.
2059
2060        # Let users touch the state
2061        self.auth.set_attribute(fid, expid)
2062        self.auth.set_attribute(expid, expid)
2063        # Override fedids can manipulate state as well
2064        for o in self.overrides:
2065            self.auth.set_attribute(o, expid)
2066
2067        # Create a logger that logs to the experiment's state object as well as
2068        # to the main log file.
2069        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2070        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2071        # XXX: there should be a global one of these rather than repeating the
2072        # code.
2073        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2074                    '%d %b %y %H:%M:%S'))
2075        alloc_log.addHandler(h)
2076       
2077        # Start a thread to do the resource allocation
2078        t  = Thread(target=self.allocate_resources,
2079                args=(allocated, master, eid, expid, expcert, tbparams, 
2080                    tmpdir, alloc_log),
2081                name=eid)
2082        t.start()
2083
2084        rv = {
2085                'experimentID': [
2086                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2087                ],
2088                'experimentStatus': 'starting',
2089                'experimentAccess': { 'X509' : expcert }
2090            }
2091
2092        return rv
2093   
2094    def get_experiment_fedid(self, key):
2095        """
2096        find the fedid associated with the localname key in the state database.
2097        """
2098
2099        rv = None
2100        self.state_lock.acquire()
2101        if self.state.has_key(key):
2102            if isinstance(self.state[key], dict):
2103                try:
2104                    kl = [ f['fedid'] for f in \
2105                            self.state[key]['experimentID']\
2106                                if f.has_key('fedid') ]
2107                except KeyError:
2108                    self.state_lock.release()
2109                    raise service_error(service_error.internal, 
2110                            "No fedid for experiment %s when getting "+\
2111                                    "fedid(!?)" % key)
2112                if len(kl) == 1:
2113                    rv = kl[0]
2114                else:
2115                    self.state_lock.release()
2116                    raise service_error(service_error.internal, 
2117                            "multiple fedids for experiment %s when " +\
2118                                    "getting fedid(!?)" % key)
2119            else:
2120                self.state_lock.release()
2121                raise service_error(service_error.internal, 
2122                        "Unexpected state for %s" % key)
2123        self.state_lock.release()
2124        return rv
2125
2126    def check_experiment_access(self, fid, key):
2127        """
2128        Confirm that the fid has access to the experiment.  Though a request
2129        may be made in terms of a local name, the access attribute is always
2130        the experiment's fedid.
2131        """
2132        if not isinstance(key, fedid):
2133            key = self.get_experiment_fedid(key)
2134
2135        if self.auth.check_attribute(fid, key):
2136            return True
2137        else:
2138            raise service_error(service_error.access, "Access Denied")
2139
2140
2141
2142    def get_vtopo(self, req, fid):
2143        """
2144        Return the stored virtual topology for this experiment
2145        """
2146        rv = None
2147        state = None
2148
2149        req = req.get('VtopoRequestBody', None)
2150        if not req:
2151            raise service_error(service_error.req,
2152                    "Bad request format (no VtopoRequestBody)")
2153        exp = req.get('experiment', None)
2154        if exp:
2155            if exp.has_key('fedid'):
2156                key = exp['fedid']
2157                keytype = "fedid"
2158            elif exp.has_key('localname'):
2159                key = exp['localname']
2160                keytype = "localname"
2161            else:
2162                raise service_error(service_error.req, "Unknown lookup type")
2163        else:
2164            raise service_error(service_error.req, "No request?")
2165
2166        self.check_experiment_access(fid, key)
2167
2168        self.state_lock.acquire()
2169        if self.state.has_key(key):
2170            if self.state[key].has_key('vtopo'):
2171                rv = { 'experiment' : {keytype: key },\
2172                        'vtopo': self.state[key]['vtopo'],\
2173                    }
2174            else:
2175                state = self.state[key]['experimentStatus']
2176        self.state_lock.release()
2177
2178        if rv: return rv
2179        else: 
2180            if state:
2181                raise service_error(service_error.partial, 
2182                        "Not ready: %s" % state)
2183            else:
2184                raise service_error(service_error.req, "No such experiment")
2185
2186    def get_vis(self, req, fid):
2187        """
2188        Return the stored visualization for this experiment
2189        """
2190        rv = None
2191        state = None
2192
2193        req = req.get('VisRequestBody', None)
2194        if not req:
2195            raise service_error(service_error.req,
2196                    "Bad request format (no VisRequestBody)")
2197        exp = req.get('experiment', None)
2198        if exp:
2199            if exp.has_key('fedid'):
2200                key = exp['fedid']
2201                keytype = "fedid"
2202            elif exp.has_key('localname'):
2203                key = exp['localname']
2204                keytype = "localname"
2205            else:
2206                raise service_error(service_error.req, "Unknown lookup type")
2207        else:
2208            raise service_error(service_error.req, "No request?")
2209
2210        self.check_experiment_access(fid, key)
2211
2212        self.state_lock.acquire()
2213        if self.state.has_key(key):
2214            if self.state[key].has_key('vis'):
2215                rv =  { 'experiment' : {keytype: key },\
2216                        'vis': self.state[key]['vis'],\
2217                        }
2218            else:
2219                state = self.state[key]['experimentStatus']
2220        self.state_lock.release()
2221
2222        if rv: return rv
2223        else:
2224            if state:
2225                raise service_error(service_error.partial, 
2226                        "Not ready: %s" % state)
2227            else:
2228                raise service_error(service_error.req, "No such experiment")
2229
2230    def clean_info_response(self, rv):
2231        """
2232        Remove the information in the experiment's state object that is not in
2233        the info response.
2234        """
2235        # Remove the owner info (should always be there, but...)
2236        if rv.has_key('owner'): del rv['owner']
2237
2238        # Convert the log into the allocationLog parameter and remove the
2239        # log entry (with defensive programming)
2240        if rv.has_key('log'):
2241            rv['allocationLog'] = "".join(rv['log'])
2242            del rv['log']
2243        else:
2244            rv['allocationLog'] = ""
2245
2246        if rv['experimentStatus'] != 'active':
2247            if rv.has_key('federant'): del rv['federant']
2248        else:
2249            # remove the allocationID info from each federant
2250            for f in rv.get('federant', []):
2251                if f.has_key('allocID'): del f['allocID']
2252        return rv
2253
2254    def get_info(self, req, fid):
2255        """
2256        Return all the stored info about this experiment
2257        """
2258        rv = None
2259
2260        req = req.get('InfoRequestBody', None)
2261        if not req:
2262            raise service_error(service_error.req,
2263                    "Bad request format (no InfoRequestBody)")
2264        exp = req.get('experiment', None)
2265        if exp:
2266            if exp.has_key('fedid'):
2267                key = exp['fedid']
2268                keytype = "fedid"
2269            elif exp.has_key('localname'):
2270                key = exp['localname']
2271                keytype = "localname"
2272            else:
2273                raise service_error(service_error.req, "Unknown lookup type")
2274        else:
2275            raise service_error(service_error.req, "No request?")
2276
2277        self.check_experiment_access(fid, key)
2278
2279        # The state may be massaged by the service function that called
2280        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2281        # state.
2282        self.state_lock.acquire()
2283        if self.state.has_key(key):
2284            rv = copy.deepcopy(self.state[key])
2285        self.state_lock.release()
2286
2287        if rv:
2288            return self.clean_info_response(rv)
2289        else:
2290            raise service_error(service_error.req, "No such experiment")
2291
2292    def get_multi_info(self, req, fid):
2293        """
2294        Return all the stored info that this fedid can access
2295        """
2296        rv = { 'info': [ ] }
2297
2298        self.state_lock.acquire()
2299        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2300            self.check_experiment_access(fid, key)
2301
2302            if self.state.has_key(key):
2303                e = copy.deepcopy(self.state[key])
2304                e = self.clean_info_response(e)
2305                rv['info'].append(e)
2306        self.state_lock.release()
2307        return rv
2308
2309
2310    def terminate_experiment(self, req, fid):
2311        """
2312        Swap this experiment out on the federants and delete the shared
2313        information
2314        """
2315        tbparams = { }
2316        req = req.get('TerminateRequestBody', None)
2317        if not req:
2318            raise service_error(service_error.req,
2319                    "Bad request format (no TerminateRequestBody)")
2320        force = req.get('force', False)
2321        exp = req.get('experiment', None)
2322        if exp:
2323            if exp.has_key('fedid'):
2324                key = exp['fedid']
2325                keytype = "fedid"
2326            elif exp.has_key('localname'):
2327                key = exp['localname']
2328                keytype = "localname"
2329            else:
2330                raise service_error(service_error.req, "Unknown lookup type")
2331        else:
2332            raise service_error(service_error.req, "No request?")
2333
2334        self.check_experiment_access(fid, key)
2335
2336        dealloc_list = [ ]
2337
2338
2339        # Create a logger that logs to the dealloc_list as well as to the main
2340        # log file.
2341        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2342        h = logging.StreamHandler(self.list_log(dealloc_list))
2343        # XXX: there should be a global one of these rather than repeating the
2344        # code.
2345        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2346                    '%d %b %y %H:%M:%S'))
2347        dealloc_log.addHandler(h)
2348
2349        self.state_lock.acquire()
2350        fed_exp = self.state.get(key, None)
2351
2352        if fed_exp:
2353            # This branch of the conditional holds the lock to generate a
2354            # consistent temporary tbparams variable to deallocate experiments.
2355            # It releases the lock to do the deallocations and reacquires it to
2356            # remove the experiment state when the termination is complete.
2357
2358            # First make sure that the experiment creation is complete.
2359            status = fed_exp.get('experimentStatus', None)
2360
2361            if status:
2362                if status in ('starting', 'terminating'):
2363                    if not force:
2364                        self.state_lock.release()
2365                        raise service_error(service_error.partial, 
2366                                'Experiment still being created or destroyed')
2367                    else:
2368                        self.log.warning('Experiment in %s state ' % status + \
2369                                'being terminated by force.')
2370            else:
2371                # No status??? trouble
2372                self.state_lock.release()
2373                raise service_error(service_error.internal,
2374                        "Experiment has no status!?")
2375
2376            ids = []
2377            #  experimentID is a list of dicts that are self-describing
2378            #  identifiers.  This finds all the fedids and localnames - the
2379            #  keys of self.state - and puts them into ids.
2380            for id in fed_exp.get('experimentID', []):
2381                if id.has_key('fedid'): ids.append(id['fedid'])
2382                if id.has_key('localname'): ids.append(id['localname'])
2383
2384            # Construct enough of the tbparams to make the stop_segment calls
2385            # work
2386            for fed in fed_exp.get('federant', []):
2387                try:
2388                    for e in fed['name']:
2389                        eid = e.get('localname', None)
2390                        if eid: break
2391                    else:
2392                        continue
2393
2394                    p = fed['emulab']['project']
2395
2396                    project = p['name']['localname']
2397                    tb = p['testbed']['localname']
2398                    user = p['user'][0]['userID']['localname']
2399
2400                    domain = fed['emulab']['domain']
2401                    host  = fed['emulab']['ops']
2402                    aid = fed['allocID']
2403                except KeyError, e:
2404                    continue
2405                tbparams[tb] = {\
2406                        'user': user,\
2407                        'domain': domain,\
2408                        'project': project,\
2409                        'host': host,\
2410                        'eid': eid,\
2411                        'aid': aid,\
2412                    }
2413            fed_exp['experimentStatus'] = 'terminating'
2414            if self.state_filename: self.write_state()
2415            self.state_lock.release()
2416
2417            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2418            # then completes, so we can't wait if nothing starts.  So, no
2419            # tbparams, no start.
2420            if len(tbparams) > 0:
2421                thread_pool = self.thread_pool(self.nthreads)
2422                for tb in tbparams.keys():
2423                    # Create and start a thread to stop the segment
2424                    thread_pool.wait_for_slot()
2425                    t  = self.pooled_thread(\
2426                            target=self.stop_segment(log=dealloc_log,
2427                                keyfile=self.ssh_privkey_file, debug=self.debug), 
2428                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2429                            pdata=thread_pool, trace_file=self.trace_file)
2430                    t.start()
2431                # Wait for completions
2432                thread_pool.wait_for_all_done()
2433
2434            # release the allocations (failed experiments have done this
2435            # already, and starting experiments may be in odd states, so we
2436            # ignore errors releasing those allocations
2437            try: 
2438                for tb in tbparams.keys():
2439                    self.release_access(tb, tbparams[tb]['aid'])
2440            except service_error, e:
2441                if status != 'failed' and not force:
2442                    raise e
2443
2444            # Remove the terminated experiment
2445            self.state_lock.acquire()
2446            for id in ids:
2447                if self.state.has_key(id): del self.state[id]
2448
2449            if self.state_filename: self.write_state()
2450            self.state_lock.release()
2451
2452            return { 
2453                    'experiment': exp , 
2454                    'deallocationLog': "".join(dealloc_list),
2455                    }
2456        else:
2457            # Don't forget to release the lock
2458            self.state_lock.release()
2459            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.