source: fedd/federation/experiment_control.py @ ca489e8

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since ca489e8 was ca489e8, checked in by Ted Faber <faber@…>, 15 years ago

Add override fedids that can access all experiments. These are admins,
basically.

Also add a force parameter to terminate requests, to force the termination (and
really deletion of internal state) for experiments in odd states or with
internal misconfigurations.

  • Property mode set to 100644
File size: 89.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = Lock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
70   
71    class thread_pool:
72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
148
149    class pooled_thread(Thread):
150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
188
189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
192
193    def __init__(self, config=None, auth=None):
194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
214        self.list_log = experiment_control_local.list_log
215
216        self.cert_file = config.get("experiment_control", "cert_file")
217        if self.cert_file:
218            self.cert_pwd = config.get("experiment_control", "cert_pwd")
219        else:
220            self.cert_file = config.get("globals", "cert_file")
221            self.cert_pwd = config.get("globals", "cert_pwd")
222
223        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
224                or config.get("globals", "trusted_certs")
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 2
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'Create': soap_handler('Create', self.create_experiment),
337                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
338                'Vis': soap_handler('Vis', self.get_vis),
339                'Info': soap_handler('Info', self.get_info),
340                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
341                'Terminate': soap_handler('Terminate', 
342                    self.terminate_experiment),
343        }
344
345        self.xmlrpc_services = {\
346                'Create': xmlrpc_handler('Create', self.create_experiment),
347                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
348                'Vis': xmlrpc_handler('Vis', self.get_vis),
349                'Info': xmlrpc_handler('Info', self.get_info),
350                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
351                'Terminate': xmlrpc_handler('Terminate',
352                    self.terminate_experiment),
353        }
354
355    def copy_file(self, src, dest, size=1024):
356        """
357        Exceedingly simple file copy.
358        """
359        s = open(src,'r')
360        d = open(dest, 'w')
361
362        buf = "x"
363        while buf != "":
364            buf = s.read(size)
365            d.write(buf)
366        s.close()
367        d.close()
368
369    # Call while holding self.state_lock
370    def write_state(self):
371        """
372        Write a new copy of experiment state after copying the existing state
373        to a backup.
374
375        State format is a simple pickling of the state dictionary.
376        """
377        if os.access(self.state_filename, os.W_OK):
378            self.copy_file(self.state_filename, \
379                    "%s.bak" % self.state_filename)
380        try:
381            f = open(self.state_filename, 'w')
382            pickle.dump(self.state, f)
383        except IOError, e:
384            self.log.error("Can't write file %s: %s" % \
385                    (self.state_filename, e))
386        except pickle.PicklingError, e:
387            self.log.error("Pickling problem: %s" % e)
388        except TypeError, e:
389            self.log.error("Pickling problem (TypeError): %s" % e)
390
391    # Call while holding self.state_lock
392    def read_state(self):
393        """
394        Read a new copy of experiment state.  Old state is overwritten.
395
396        State format is a simple pickling of the state dictionary.
397        """
398        try:
399            f = open(self.state_filename, "r")
400            self.state = pickle.load(f)
401            self.log.debug("[read_state]: Read state from %s" % \
402                    self.state_filename)
403        except IOError, e:
404            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
405                    % (self.state_filename, e))
406        except pickle.UnpicklingError, e:
407            self.log.warning(("[read_state]: No saved state: " + \
408                    "Unpickling failed: %s") % e)
409       
410        for k in self.state.keys():
411            try:
412                # This list should only have one element in it, but phrasing it
413                # as a for loop doesn't cost much, really.  We have to find the
414                # fedid elements anyway.
415                for eid in [ f['fedid'] \
416                        for f in self.state[k]['experimentID']\
417                            if f.has_key('fedid') ]:
418                    self.auth.set_attribute(self.state[k]['owner'], eid)
419                    # allow overrides to control experiments as well
420                    for o in self.overrides:
421                        self.auth.set_attribute(o, eid)
422            except KeyError, e:
423                self.log.warning("[read_state]: State ownership or identity " +\
424                        "misformatted in %s: %s" % (self.state_filename, e))
425
426
427    def read_accessdb(self, accessdb_file):
428        """
429        Read the mapping from fedids that can create experiments to their name
430        in the 3-level access namespace.  All will be asserted from this
431        testbed and can include the local username and porject that will be
432        asserted on their behalf by this fedd.  Each fedid is also added to the
433        authorization system with the "create" attribute.
434        """
435        self.accessdb = {}
436        # These are the regexps for parsing the db
437        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
438        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
439                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
440        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
441                "\s*->\s*(" + name_expr + ")\s*$")
442        lineno = 0
443
444        # Parse the mappings and store in self.authdb, a dict of
445        # fedid -> (proj, user)
446        try:
447            f = open(accessdb_file, "r")
448            for line in f:
449                lineno += 1
450                line = line.strip()
451                if len(line) == 0 or line.startswith('#'):
452                    continue
453                m = project_line.match(line)
454                if m:
455                    fid = fedid(hexstr=m.group(1))
456                    project, user = m.group(2,3)
457                    if not self.accessdb.has_key(fid):
458                        self.accessdb[fid] = []
459                    self.accessdb[fid].append((project, user))
460                    continue
461
462                m = user_line.match(line)
463                if m:
464                    fid = fedid(hexstr=m.group(1))
465                    project = None
466                    user = m.group(2)
467                    if not self.accessdb.has_key(fid):
468                        self.accessdb[fid] = []
469                    self.accessdb[fid].append((project, user))
470                    continue
471                self.log.warn("[experiment_control] Error parsing access " +\
472                        "db %s at line %d" %  (accessdb_file, lineno))
473        except IOError:
474            raise service_error(service_error.internal,
475                    "Error opening/reading %s as experiment " +\
476                            "control accessdb" %  accessdb_file)
477        f.close()
478
479        # Initialize the authorization attributes
480        for fid in self.accessdb.keys():
481            self.auth.set_attribute(fid, 'create')
482
483    def read_mapdb(self, file):
484        """
485        Read a simple colon separated list of mappings for the
486        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
487        """
488
489        self.tbmap = { }
490        lineno =0
491        try:
492            f = open(file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if line.startswith('#') or len(line) == 0:
497                    continue
498                try:
499                    label, url = line.split(':', 1)
500                    self.tbmap[label] = url
501                except ValueError, e:
502                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
503                            "map db: %s %s" % (lineno, line, e))
504        except IOError, e:
505            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
506                    "open %s: %s" % (file, e))
507        f.close()
508
509    class emulab_segment:
510        def __init__(self, log=None, keyfile=None, debug=False):
511            self.log = log or logging.getLogger(\
512                    'fedd.experiment_control.emulab_segment')
513            self.ssh_privkey_file = keyfile
514            self.debug = debug
515            self.ssh_exec="/usr/bin/ssh"
516            self.scp_exec = "/usr/bin/scp"
517            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
518
519        def scp_file(self, file, user, host, dest=""):
520            """
521            scp a file to the remote host.  If debug is set the action is only
522            logged.
523            """
524
525            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
526                    '-o', 'StrictHostKeyChecking yes', '-i', 
527                    self.ssh_privkey_file, file, 
528                    "%s@%s:%s" % (user, host, dest)]
529            rv = 0
530
531            try:
532                dnull = open("/dev/null", "w")
533            except IOError:
534                self.log.debug("[ssh_file]: failed to open " + \
535                        "/dev/null for redirect")
536                dnull = Null
537
538            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
539            if not self.debug:
540                rv = call(scp_cmd, stdout=dnull, stderr=dnull)
541
542            return rv == 0
543
544        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
545            """
546            Run a remote command on host as user.  If debug is set, the action
547            is only logged.  Commands are run without stdin, to avoid stray
548            SIGTTINs.
549            """
550            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
551                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
552                    (self.ssh_exec, self.ssh_privkey_file, 
553                            user, host, cmd)
554
555            try:
556                dnull = open("/dev/null", "r")
557            except IOError:
558                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
559                        "for redirect")
560                dnull = Null
561
562            self.log.debug("[ssh_cmd]: %s" % sh_str)
563            if not self.debug:
564                if dnull:
565                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
566                else:
567                    sub = Popen(sh_str, shell=True)
568                if timeout:
569                    i = 0
570                    rv = sub.poll()
571                    while i < timeout:
572                        if rv is not None: break
573                        else:
574                            time.sleep(1)
575                            rv = sub.poll()
576                            i += 1
577                    else:
578                        self.log.debug("Process exceeded runtime: %s" % sh_str)
579                        os.kill(sub.pid, signal.SIGKILL)
580                        raise self.ssh_cmd_timeout();
581                    return rv == 0
582                else:
583                    return sub.wait() == 0
584            else:
585                return True
586
587    class start_segment(emulab_segment):
588        def __init__(self, log=None, keyfile=None, debug=False):
589            experiment_control_local.emulab_segment.__init__(self,
590                    log=log, keyfile=keyfile, debug=debug)
591
592        def create_config_tree(self, src_dir, dest_dir, script):
593            """
594            Append commands to script that will create the directory hierarchy
595            on the remote federant.
596            """
597
598            if os.path.isdir(src_dir):
599                print >>script, "mkdir -p %s" % dest_dir
600                print >>script, "chmod 770 %s" % dest_dir
601
602                for f in os.listdir(src_dir):
603                    if os.path.isdir(f):
604                        self.create_config_tree("%s/%s" % (src_dir, f), 
605                                "%s/%s" % (dest_dir, f), script)
606            else:
607                self.log.debug("[create_config_tree]: Not a directory: %s" \
608                        % src_dir)
609
610        def ship_configs(self, host, user, src_dir, dest_dir):
611            """
612            Copy federant-specific configuration files to the federant.
613            """
614            for f in os.listdir(src_dir):
615                if os.path.isdir(f):
616                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
617                            "%s/%s" % (dest_dir, f)):
618                        return False
619                else:
620                    if not self.scp_file("%s/%s" % (src_dir, f), 
621                            user, host, dest_dir):
622                        return False
623            return True
624
625        def get_state(self, user, host, tb, pid, eid):
626            # command to test experiment state
627            expinfo_exec = "/usr/testbed/bin/expinfo" 
628            # Regular expressions to parse the expinfo response
629            state_re = re.compile("State:\s+(\w+)")
630            no_exp_re = re.compile("^No\s+such\s+experiment")
631            swapping_re = re.compile("^No\s+information\s+available.")
632            state = None    # Experiment state parsed from expinfo
633            # The expinfo ssh command.  Note the identity restriction to use
634            # only the identity provided in the pubkey given.
635            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
636                    'StrictHostKeyChecking yes', '-i', 
637                    self.ssh_privkey_file, "%s@%s" % (user, host), 
638                    expinfo_exec, pid, eid]
639
640            dev_null = None
641            try:
642                dev_null = open("/dev/null", "a")
643            except IOError, e:
644                self.log.error("[get_state]: can't open /dev/null: %s" %e)
645
646            if self.debug:
647                state = 'swapped'
648                rv = 0
649            else:
650                status = Popen(cmd, stdout=PIPE, stderr=dev_null)
651                for line in status.stdout:
652                    m = state_re.match(line)
653                    if m: state = m.group(1)
654                    else:
655                        for reg, st in ((no_exp_re, "none"),
656                                (swapping_re, "swapping")):
657                            m = reg.match(line)
658                            if m: state = st
659                rv = status.wait()
660
661            # If the experiment is not present the subcommand returns a
662            # non-zero return value.  If we successfully parsed a "none"
663            # outcome, ignore the return code.
664            if rv != 0 and state != 'none':
665                raise service_error(service_error.internal,
666                        "Cannot get status of segment %s:%s/%s" % \
667                                (tb, pid, eid))
668            elif state not in ('active', 'swapped', 'swapping', 'none'):
669                raise service_error(service_error.internal,
670                        "Cannot get status of segment %s:%s/%s" % \
671                                (tb, pid, eid))
672            else: return state
673
674
675        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
676            """
677            Start a sub-experiment on a federant.
678
679            Get the current state, modify or create as appropriate, ship data
680            and configs and start the experiment.  There are small ordering
681            differences based on the initial state of the sub-experiment.
682            """
683            # ops node in the federant
684            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
685            user = tbparams[tb]['user']     # federant user
686            pid = tbparams[tb]['project']   # federant project
687            # XXX
688            base_confs = ( "hosts",)
689            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
690            # Configuration directories on the remote machine
691            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
692            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
693            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
694
695            state = self.get_state(user, host, tb, pid, eid)
696
697            self.log.debug("[start_segment]: %s: %s" % (tb, state))
698            self.log.info("[start_segment]:transferring experiment to %s" % tb)
699
700            if not self.scp_file("%s/%s/%s" % \
701                    (tmpdir, tb, tclfile), user, host):
702                return False
703           
704            if state == 'none':
705                # Create a null copy of the experiment so that we capture any
706                # logs there if the modify fails.  Emulab software discards the
707                # logs from a failed startexp
708                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
709                    return False
710                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
711                timedout = False
712                try:
713                    if not self.ssh_cmd(user, host,
714                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
715                            "-e %s null.tcl") % (pid, eid), "startexp",
716                            timeout=60 * 10):
717                        return False
718                except self.ssh_cmd_timeout:
719                    timedout = True
720
721                if timedout:
722                    state = self.get_state(user, host, self.ssh_privkey_file, 
723                            tb, eid, pid)
724                    if state != "swapped":
725                        return False
726
727           
728            # Open up a temporary file to contain a script for setting up the
729            # filespace for the new experiment.
730            self.log.info("[start_segment]: creating script file")
731            try:
732                sf, scriptname = tempfile.mkstemp()
733                scriptfile = os.fdopen(sf, 'w')
734            except IOError:
735                return False
736
737            scriptbase = os.path.basename(scriptname)
738
739            # Script the filesystem changes
740            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
741            # Clear and create the tarfiles and rpm directories
742            for d in (tarfiles_dir, rpms_dir):
743                print >>scriptfile, "/bin/rm -rf %s/*" % d
744                print >>scriptfile, "mkdir -p %s" % d
745            print >>scriptfile, 'mkdir -p %s' % proj_dir
746            self.create_config_tree("%s/%s" % (tmpdir, tb),
747                    proj_dir, scriptfile)
748            if os.path.isdir("%s/tarfiles" % tmpdir):
749                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
750                        scriptfile)
751            if os.path.isdir("%s/rpms" % tmpdir):
752                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
753                        scriptfile)
754            print >>scriptfile, "rm -f %s" % scriptbase
755            scriptfile.close()
756
757            # Move the script to the remote machine
758            # XXX: could collide tempfile names on the remote host
759            if self.scp_file(scriptname, user, host, scriptbase):
760                os.remove(scriptname)
761            else:
762                return False
763
764            # Execute the script (and the script's last line deletes it)
765            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
766                return False
767
768            for f in base_confs:
769                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
770                        "%s/%s" % (proj_dir, f)):
771                    return False
772            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
773                    proj_dir):
774                return False
775            if os.path.isdir("%s/tarfiles" % tmpdir):
776                if not self.ship_configs(host, user,
777                        "%s/tarfiles" % tmpdir, tarfiles_dir):
778                    return False
779            if os.path.isdir("%s/rpms" % tmpdir):
780                if not self.ship_configs(host, user,
781                        "%s/rpms" % tmpdir, tarfiles_dir):
782                    return False
783            # Stage the new configuration (active experiments will stay swapped
784            # in now)
785            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
786            try:
787                if not self.ssh_cmd(user, host,
788                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
789                                (pid, eid, tclfile),
790                        "modexp", timeout= 60 * 10):
791                    return False
792            except self.ssh_cmd_timeout:
793                print "modexp timeout"
794                # There's really no way to see if this succeeded or failed, so
795                # if it hangs, assume the worst.
796                return False
797            # Active experiments are still swapped, this swaps the others in.
798            if state != 'active':
799                self.log.info("[start_segment]: Swapping %s in on %s" % \
800                        (eid, tb))
801                timedout = False
802                try:
803                    if not self.ssh_cmd(user, host,
804                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
805                            "swapexp", timeout=10*60):
806                        return False
807                except self.ssh_cmd_timeout:
808                    timedout = True
809               
810                # If the command was terminated, but completed successfully,
811                # report success.
812                if timedout:
813                    state = self.get_state(user, host, self.ssh_privkey_file,
814                            tb, eid, pid)
815                    self.log.debug("[start_segment]: swapin timed out (state)")
816                    return state == 'active'
817            # Everything has gone OK.
818            return True
819
820    class stop_segment(emulab_segment):
821        def __init__(self, log=None, keyfile=None, debug=False):
822            experiment_control_local.emulab_segment.__init__(self,
823                    log=log, keyfile=keyfile, debug=debug)
824
825        def __call__(self, tb, eid, tbparams):
826            """
827            Stop a sub experiment by calling swapexp on the federant
828            """
829            user = tbparams[tb]['user']
830            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
831            pid = tbparams[tb]['project']
832
833            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
834            return self.ssh_cmd(user, host,
835                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
836
837       
838    def generate_ssh_keys(self, dest, type="rsa" ):
839        """
840        Generate a set of keys for the gateways to use to talk.
841
842        Keys are of type type and are stored in the required dest file.
843        """
844        valid_types = ("rsa", "dsa")
845        t = type.lower();
846        if t not in valid_types: raise ValueError
847        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
848
849        try:
850            trace = open("/dev/null", "w")
851        except IOError:
852            raise service_error(service_error.internal,
853                    "Cannot open /dev/null??");
854
855        # May raise CalledProcessError
856        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
857        rv = call(cmd, stdout=trace, stderr=trace)
858        if rv != 0:
859            raise service_error(service_error.internal, 
860                    "Cannot generate nonce ssh keys.  %s return code %d" \
861                            % (self.ssh_keygen, rv))
862
863    def gentopo(self, str):
864        """
865        Generate the topology dtat structure from the splitter's XML
866        representation of it.
867
868        The topology XML looks like:
869            <experiment>
870                <nodes>
871                    <node><vname></vname><ips>ip1:ip2</ips></node>
872                </nodes>
873                <lans>
874                    <lan>
875                        <vname></vname><vnode></vnode><ip></ip>
876                        <bandwidth></bandwidth><member>node:port</member>
877                    </lan>
878                </lans>
879        """
880        class topo_parse:
881            """
882            Parse the topology XML and create the dats structure.
883            """
884            def __init__(self):
885                # Typing of the subelements for data conversion
886                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
887                self.int_subelements = ( 'bandwidth',)
888                self.float_subelements = ( 'delay',)
889                # The final data structure
890                self.nodes = [ ]
891                self.lans =  [ ]
892                self.topo = { \
893                        'node': self.nodes,\
894                        'lan' : self.lans,\
895                    }
896                self.element = { }  # Current element being created
897                self.chars = ""     # Last text seen
898
899            def end_element(self, name):
900                # After each sub element the contents is added to the current
901                # element or to the appropriate list.
902                if name == 'node':
903                    self.nodes.append(self.element)
904                    self.element = { }
905                elif name == 'lan':
906                    self.lans.append(self.element)
907                    self.element = { }
908                elif name in self.str_subelements:
909                    self.element[name] = self.chars
910                    self.chars = ""
911                elif name in self.int_subelements:
912                    self.element[name] = int(self.chars)
913                    self.chars = ""
914                elif name in self.float_subelements:
915                    self.element[name] = float(self.chars)
916                    self.chars = ""
917
918            def found_chars(self, data):
919                self.chars += data.rstrip()
920
921
922        tp = topo_parse();
923        parser = xml.parsers.expat.ParserCreate()
924        parser.EndElementHandler = tp.end_element
925        parser.CharacterDataHandler = tp.found_chars
926
927        parser.Parse(str)
928
929        return tp.topo
930       
931
932    def genviz(self, topo):
933        """
934        Generate the visualization the virtual topology
935        """
936
937        neato = "/usr/local/bin/neato"
938        # These are used to parse neato output and to create the visualization
939        # file.
940        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
941        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
942                "%s</type></node>"
943
944        try:
945            # Node names
946            nodes = [ n['vname'] for n in topo['node'] ]
947            topo_lans = topo['lan']
948        except KeyError:
949            raise service_error(service_error.internal, "Bad topology")
950
951        lans = { }
952        links = { }
953
954        # Walk through the virtual topology, organizing the connections into
955        # 2-node connections (links) and more-than-2-node connections (lans).
956        # When a lan is created, it's added to the list of nodes (there's a
957        # node in the visualization for the lan).
958        for l in topo_lans:
959            if links.has_key(l['vname']):
960                if len(links[l['vname']]) < 2:
961                    links[l['vname']].append(l['vnode'])
962                else:
963                    nodes.append(l['vname'])
964                    lans[l['vname']] = links[l['vname']]
965                    del links[l['vname']]
966                    lans[l['vname']].append(l['vnode'])
967            elif lans.has_key(l['vname']):
968                lans[l['vname']].append(l['vnode'])
969            else:
970                links[l['vname']] = [ l['vnode'] ]
971
972
973        # Open up a temporary file for dot to turn into a visualization
974        try:
975            df, dotname = tempfile.mkstemp()
976            dotfile = os.fdopen(df, 'w')
977        except IOError:
978            raise service_error(service_error.internal,
979                    "Failed to open file in genviz")
980
981        # Generate a dot/neato input file from the links, nodes and lans
982        try:
983            print >>dotfile, "graph G {"
984            for n in nodes:
985                print >>dotfile, '\t"%s"' % n
986            for l in links.keys():
987                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
988            for l in lans.keys():
989                for n in lans[l]:
990                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
991            print >>dotfile, "}"
992            dotfile.close()
993        except TypeError:
994            raise service_error(service_error.internal,
995                    "Single endpoint link in vtopo")
996        except IOError:
997            raise service_error(service_error.internal, "Cannot write dot file")
998
999        # Use dot to create a visualization
1000        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1001                '-Gpack=true', dotname], stdout=PIPE)
1002
1003        # Translate dot to vis format
1004        vis_nodes = [ ]
1005        vis = { 'node': vis_nodes }
1006        for line in dot.stdout:
1007            m = vis_re.match(line)
1008            if m:
1009                vn = m.group(1)
1010                vis_node = {'name': vn, \
1011                        'x': float(m.group(2)),\
1012                        'y' : float(m.group(3)),\
1013                    }
1014                if vn in links.keys() or vn in lans.keys():
1015                    vis_node['type'] = 'lan'
1016                else:
1017                    vis_node['type'] = 'node'
1018                vis_nodes.append(vis_node)
1019        rv = dot.wait()
1020
1021        os.remove(dotname)
1022        if rv == 0 : return vis
1023        else: return None
1024
1025    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1026            access_user):
1027        """
1028        Get access to testbed through fedd and set the parameters for that tb
1029        """
1030        uri = self.tbmap.get(tb, None)
1031        if not uri:
1032            raise service_error(serice_error.server_config, 
1033                    "Unknown testbed: %s" % tb)
1034
1035        # currently this lumps all users into one service access group
1036        service_keys = [ a for u in user \
1037                for a in u.get('access', []) \
1038                    if a.has_key('sshPubkey')]
1039
1040        if len(service_keys) == 0:
1041            raise service_error(service_error.req, 
1042                    "Must have at least one SSH pubkey for services")
1043
1044
1045        for p, u in access_user:
1046            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1047                    "to %s") %  ((p or "None"), u, uri))
1048
1049            if p:
1050                # Request with user and project specified
1051                req = {\
1052                        'destinationTestbed' : { 'uri' : uri },
1053                        'project': { 
1054                            'name': {'localname': p},
1055                            'user': [ {'userID': { 'localname': u } } ],
1056                            },
1057                        'user':  user,
1058                        'allocID' : { 'localname': 'test' },
1059                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1060                        'serviceAccess' : service_keys
1061                    }
1062            else:
1063                # Request with only user specified
1064                req = {\
1065                        'destinationTestbed' : { 'uri' : uri },
1066                        'user':  [ {'userID': { 'localname': u } } ],
1067                        'allocID' : { 'localname': 'test' },
1068                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1069                        'serviceAccess' : service_keys
1070                    }
1071
1072            if tb == master:
1073                # NB, the export_project parameter is a dict that includes
1074                # the type
1075                req['exportProject'] = export_project
1076
1077            # node resources if any
1078            if nodes != None and len(nodes) > 0:
1079                rnodes = [ ]
1080                for n in nodes:
1081                    rn = { }
1082                    image, hw, count = n.split(":")
1083                    if image: rn['image'] = [ image ]
1084                    if hw: rn['hardware'] = [ hw ]
1085                    if count and int(count) >0 : rn['count'] = int(count)
1086                    rnodes.append(rn)
1087                req['resources']= { }
1088                req['resources']['node'] = rnodes
1089
1090            try:
1091                if self.local_access.has_key(uri):
1092                    # Local access call
1093                    req = { 'RequestAccessRequestBody' : req }
1094                    r = self.local_access[uri].RequestAccess(req, 
1095                            fedid(file=self.cert_file))
1096                    r = { 'RequestAccessResponseBody' : r }
1097                else:
1098                    r = self.call_RequestAccess(uri, req, 
1099                            self.cert_file, self.cert_pwd, self.trusted_certs)
1100            except service_error, e:
1101                if e.code == service_error.access:
1102                    self.log.debug("[get_access] Access denied")
1103                    r = None
1104                    continue
1105                else:
1106                    raise e
1107
1108            if r.has_key('RequestAccessResponseBody'):
1109                # Through to here we have a valid response, not a fault.
1110                # Access denied is a fault, so something better or worse than
1111                # access denied has happened.
1112                r = r['RequestAccessResponseBody']
1113                self.log.debug("[get_access] Access granted")
1114                break
1115            else:
1116                raise service_error(service_error.protocol,
1117                        "Bad proxy response")
1118       
1119        if not r:
1120            raise service_error(service_error.access, 
1121                    "Access denied by %s (%s)" % (tb, uri))
1122
1123        e = r['emulab']
1124        p = e['project']
1125        tbparam[tb] = { 
1126                "boss": e['boss'],
1127                "host": e['ops'],
1128                "domain": e['domain'],
1129                "fs": e['fileServer'],
1130                "eventserver": e['eventServer'],
1131                "project": unpack_id(p['name']),
1132                "emulab" : e,
1133                "allocID" : r['allocID'],
1134                }
1135        # Make the testbed name be the label the user applied
1136        p['testbed'] = {'localname': tb }
1137
1138        for u in p['user']:
1139            role = u.get('role', None)
1140            if role == 'experimentCreation':
1141                tbparam[tb]['user'] = unpack_id(u['userID'])
1142                break
1143        else:
1144            raise service_error(service_error.internal, 
1145                    "No createExperimentUser from %s" %tb)
1146
1147        # Add attributes to barameter space.  We don't allow attributes to
1148        # overlay any parameters already installed.
1149        for a in e['fedAttr']:
1150            try:
1151                if a['attribute'] and isinstance(a['attribute'], basestring)\
1152                        and not tbparam[tb].has_key(a['attribute'].lower()):
1153                    tbparam[tb][a['attribute'].lower()] = a['value']
1154            except KeyError:
1155                self.log.error("Bad attribute in response: %s" % a)
1156       
1157    def release_access(self, tb, aid):
1158        """
1159        Release access to testbed through fedd
1160        """
1161
1162        uri = self.tbmap.get(tb, None)
1163        if not uri:
1164            raise service_error(serice_error.server_config, 
1165                    "Unknown testbed: %s" % tb)
1166
1167        if self.local_access.has_key(uri):
1168            resp = self.local_access[uri].ReleaseAccess(\
1169                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1170                    fedid(file=self.cert_file))
1171            resp = { 'ReleaseAccessResponseBody': resp } 
1172        else:
1173            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1174                    self.cert_file, self.cert_pwd, self.trusted_certs)
1175
1176        # better error coding
1177
1178    def remote_splitter(self, uri, desc, master):
1179
1180        req = {
1181                'description' : { 'ns2description': desc },
1182                'master': master,
1183                'include_fedkit': bool(self.fedkit),
1184                'include_gatewaykit': bool(self.gatewaykit)
1185            }
1186
1187        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1188                self.trusted_certs)
1189
1190        if r.has_key('Ns2SplitResponseBody'):
1191            r = r['Ns2SplitResponseBody']
1192            if r.has_key('output'):
1193                return r['output'].splitlines()
1194            else:
1195                raise service_error(service_error.protocol, 
1196                        "Bad splitter response (no output)")
1197        else:
1198            raise service_error(service_error.protocol, "Bad splitter response")
1199       
1200    class current_testbed:
1201        """
1202        Object for collecting the current testbed description.  The testbed
1203        description is saved to a file with the local testbed variables
1204        subsittuted line by line.
1205        """
1206        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1207            def tar_list_to_string(tl):
1208                if tl is None: return None
1209
1210                rv = ""
1211                for t in tl:
1212                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1213                            (t[0], os.path.basename(t[1]))
1214                return rv
1215
1216
1217            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1218            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1219            self.current_testbed = None
1220            self.testbed_file = None
1221
1222            self.def_expstart = \
1223                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1224            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1225            self.def_gwstart = \
1226                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1227            self.def_mgwstart = \
1228                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1229            self.def_gwimage = "FBSD61-TUNNEL2";
1230            self.def_gwtype = "pc";
1231            self.def_mgwcmd = '# '
1232            self.def_mgwcmdparams = ''
1233            self.def_gwcmd = '# '
1234            self.def_gwcmdparams = ''
1235
1236            self.eid = eid
1237            self.tmpdir = tmpdir
1238            # Convert fedkit and gateway kit (which are lists of tuples) into a
1239            # substituition string.
1240            self.fedkit = tar_list_to_string(fedkit)
1241            self.gatewaykit = tar_list_to_string(gatewaykit)
1242
1243        def __call__(self, line, master, allocated, tbparams):
1244            # Capture testbed topology descriptions
1245            if self.current_testbed == None:
1246                m = self.begin_testbed.match(line)
1247                if m != None:
1248                    self.current_testbed = m.group(1)
1249                    if self.current_testbed == None:
1250                        raise service_error(service_error.req,
1251                                "Bad request format (unnamed testbed)")
1252                    allocated[self.current_testbed] = \
1253                            allocated.get(self.current_testbed,0) + 1
1254                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1255                    if not os.path.exists(tb_dir):
1256                        try:
1257                            os.mkdir(tb_dir)
1258                        except IOError:
1259                            raise service_error(service_error.internal,
1260                                    "Cannot create %s" % tb_dir)
1261                    try:
1262                        self.testbed_file = open("%s/%s.%s.tcl" %
1263                                (tb_dir, self.eid, self.current_testbed), 'w')
1264                    except IOError:
1265                        self.testbed_file = None
1266                    return True
1267                else: return False
1268            else:
1269                m = self.end_testbed.match(line)
1270                if m != None:
1271                    if m.group(1) != self.current_testbed:
1272                        raise service_error(service_error.internal, 
1273                                "Mismatched testbed markers!?")
1274                    if self.testbed_file != None: 
1275                        self.testbed_file.close()
1276                        self.testbed_file = None
1277                    self.current_testbed = None
1278                elif self.testbed_file:
1279                    # Substitute variables and put the line into the local
1280                    # testbed file.
1281                    gwtype = tbparams[self.current_testbed].get(\
1282                            'connectortype', self.def_gwtype)
1283                    gwimage = tbparams[self.current_testbed].get(\
1284                            'connectorimage', self.def_gwimage)
1285                    mgwstart = tbparams[self.current_testbed].get(\
1286                            'masterconnectorstartcmd', self.def_mgwstart)
1287                    mexpstart = tbparams[self.current_testbed].get(\
1288                            'masternodestartcmd', self.def_mexpstart)
1289                    gwstart = tbparams[self.current_testbed].get(\
1290                            'slaveconnectorstartcmd', self.def_gwstart)
1291                    expstart = tbparams[self.current_testbed].get(\
1292                            'slavenodestartcmd', self.def_expstart)
1293                    project = tbparams[self.current_testbed].get('project')
1294                    gwcmd = tbparams[self.current_testbed].get(\
1295                            'slaveconnectorcmd', self.def_gwcmd)
1296                    gwcmdparams = tbparams[self.current_testbed].get(\
1297                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1298                    mgwcmd = tbparams[self.current_testbed].get(\
1299                            'masterconnectorcmd', self.def_gwcmd)
1300                    mgwcmdparams = tbparams[self.current_testbed].get(\
1301                            'masterconnectorcmdparams', self.def_gwcmdparams)
1302                    line = re.sub("GWTYPE", gwtype, line)
1303                    line = re.sub("GWIMAGE", gwimage, line)
1304                    if self.current_testbed == master:
1305                        line = re.sub("GWSTART", mgwstart, line)
1306                        line = re.sub("EXPSTART", mexpstart, line)
1307                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1308                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1309                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1310                    else:
1311                        line = re.sub("GWSTART", gwstart, line)
1312                        line = re.sub("EXPSTART", expstart, line)
1313                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1314                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1315                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1316                    #These expansions contain EID and PROJDIR.  NB these are
1317                    # local fedkit and gatewaykit, which are strings.
1318                    if self.fedkit:
1319                        line = re.sub("FEDKIT", self.fedkit, line)
1320                    if self.gatewaykit:
1321                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1322                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1323                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1324                    line = re.sub("EID", self.eid, line)
1325                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1326                            (project, self.eid), line)
1327                    print >>self.testbed_file, line
1328                return True
1329
1330    class allbeds:
1331        """
1332        Process the Allbeds section.  Get access to each federant and save the
1333        parameters in tbparams
1334        """
1335        def __init__(self, get_access):
1336            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1337            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1338            self.in_allbeds = False
1339            self.get_access = get_access
1340
1341        def __call__(self, line, user, tbparams, master, export_project,
1342                access_user):
1343            # Testbed access parameters
1344            if not self.in_allbeds:
1345                if self.begin_allbeds.match(line):
1346                    self.in_allbeds = True
1347                    return True
1348                else:
1349                    return False
1350            else:
1351                if self.end_allbeds.match(line):
1352                    self.in_allbeds = False
1353                else:
1354                    nodes = line.split('|')
1355                    tb = nodes.pop(0)
1356                    self.get_access(tb, nodes, user, tbparams, master,
1357                            export_project, access_user)
1358                return True
1359
1360    class gateways:
1361        def __init__(self, eid, master, tmpdir, gw_pubkey,
1362                gw_secretkey, copy_file, fedkit):
1363            self.begin_gateways = \
1364                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1365            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1366            self.current_gateways = None
1367            self.control_gateway = None
1368            self.active_end = { }
1369
1370            self.eid = eid
1371            self.master = master
1372            self.tmpdir = tmpdir
1373            self.gw_pubkey_base = gw_pubkey
1374            self.gw_secretkey_base = gw_secretkey
1375
1376            self.copy_file = copy_file
1377            self.fedkit = fedkit
1378
1379
1380        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1381                active_end, tbparams, dtb, myname, desthost, type):
1382            """
1383            Produce a gateway configuration file from a gateways line.
1384            """
1385
1386            sproject = tbparams[gw].get('project', 'project')
1387            dproject = tbparams[dtb].get('project', 'project')
1388            sdomain = ".%s.%s%s" % (eid, sproject,
1389                    tbparams[gw].get('domain', ".example.com"))
1390            ddomain = ".%s.%s%s" % (eid, dproject,
1391                    tbparams[dtb].get('domain', ".example.com"))
1392            boss = tbparams[master].get('boss', "boss")
1393            fs = tbparams[master].get('fs', "fs")
1394            event_server = "%s%s" % \
1395                    (tbparams[gw].get('eventserver', "event_server"),
1396                            tbparams[gw].get('domain', "example.com"))
1397            remote_event_server = "%s%s" % \
1398                    (tbparams[dtb].get('eventserver', "event_server"),
1399                            tbparams[dtb].get('domain', "example.com"))
1400            seer_control = "%s%s" % \
1401                    (tbparams[gw].get('control', "control"), sdomain)
1402            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1403
1404            if self.fedkit:
1405                remote_script_dir = "/usr/local/federation/bin"
1406                local_script_dir = "/usr/local/federation/bin"
1407            else:
1408                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1409                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1410
1411            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1412            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1413            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1414
1415            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1416            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1417
1418            # translate to lower case so the `hostname` hack for specifying
1419            # configuration files works.
1420            conf_file = conf_file.lower();
1421            remote_conf_file = remote_conf_file.lower();
1422
1423            if dtb == master:
1424                active = "false"
1425            elif gw == master:
1426                active = "true"
1427            elif active_end.has_key('%s-%s' % (dtb, gw)):
1428                active = "false"
1429            else:
1430                active_end['%s-%s' % (gw, dtb)] = 1
1431                active = "true"
1432
1433            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1434            print >>gwconfig, "Active: %s" % active
1435            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1436            if tunnel_iface:
1437                print >>gwconfig, "Interface: %s" % tunnel_iface
1438            print >>gwconfig, "BossName: %s" % boss
1439            print >>gwconfig, "FsName: %s" % fs
1440            print >>gwconfig, "EventServerName: %s" % event_server
1441            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1442            print >>gwconfig, "SeerControl: %s" % seer_control
1443            print >>gwconfig, "Type: %s" % type
1444            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1445            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1446                    local_script_dir
1447            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1448            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1449            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1450                    (remote_conf_dir, remote_conf_file)
1451            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1452            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1453            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1454            gwconfig.close()
1455
1456            return active == "true"
1457
1458        def __call__(self, line, allocated, tbparams):
1459            # Process gateways
1460            if not self.current_gateways:
1461                m = self.begin_gateways.match(line)
1462                if m:
1463                    self.current_gateways = m.group(1)
1464                    if allocated.has_key(self.current_gateways):
1465                        # This test should always succeed
1466                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1467                        if not os.path.exists(tb_dir):
1468                            try:
1469                                os.mkdir(tb_dir)
1470                            except IOError:
1471                                raise service_error(service_error.internal,
1472                                        "Cannot create %s" % tb_dir)
1473                    else:
1474                        # XXX
1475                        self.log.error("[gateways]: Ignoring gateways for " + \
1476                                "unknown testbed %s" % self.current_gateways)
1477                        self.current_gateways = None
1478                    return True
1479                else:
1480                    return False
1481            else:
1482                m = self.end_gateways.match(line)
1483                if m :
1484                    if m.group(1) != self.current_gateways:
1485                        raise service_error(service_error.internal,
1486                                "Mismatched gateway markers!?")
1487                    if self.control_gateway:
1488                        try:
1489                            cc = open("%s/%s/client.conf" %
1490                                    (self.tmpdir, self.current_gateways), 'w')
1491                            print >>cc, "ControlGateway: %s" % \
1492                                    self.control_gateway
1493                            if tbparams[self.master].has_key('smbshare'):
1494                                print >>cc, "SMBSHare: %s" % \
1495                                        tbparams[self.master]['smbshare']
1496                            print >>cc, "ProjectUser: %s" % \
1497                                    tbparams[self.master]['user']
1498                            print >>cc, "ProjectName: %s" % \
1499                                    tbparams[self.master]['project']
1500                            print >>cc, "ExperimentID: %s/%s" % \
1501                                    ( tbparams[self.master]['project'], \
1502                                    self.eid )
1503                            cc.close()
1504                        except IOError:
1505                            raise service_error(service_error.internal,
1506                                    "Error creating client config")
1507                        # XXX: This seer specific file should disappear
1508                        try:
1509                            cc = open("%s/%s/seer.conf" %
1510                                    (self.tmpdir, self.current_gateways),
1511                                    'w')
1512                            if self.current_gateways != self.master:
1513                                print >>cc, "ControlNode: %s" % \
1514                                        self.control_gateway
1515                            print >>cc, "ExperimentID: %s/%s" % \
1516                                    ( tbparams[self.master]['project'], \
1517                                    self.eid )
1518                            cc.close()
1519                        except IOError:
1520                            raise service_error(service_error.internal,
1521                                    "Error creating seer config")
1522                    else:
1523                        debug.error("[gateways]: No control gateway for %s" %\
1524                                    self.current_gateways)
1525                    self.current_gateways = None
1526                else:
1527                    dtb, myname, desthost, type = line.split(" ")
1528
1529                    if type == "control" or type == "both":
1530                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1531                                self.eid, 
1532                                tbparams[self.current_gateways]['project'],
1533                                tbparams[self.current_gateways]['domain'])
1534                    try:
1535                        active = self.gateway_conf_file(self.current_gateways,
1536                                self.master, self.eid, self.gw_pubkey_base,
1537                                self.gw_secretkey_base,
1538                                self.active_end, tbparams, dtb, myname,
1539                                desthost, type)
1540                    except IOError, e:
1541                        raise service_error(service_error.internal,
1542                                "Failed to write config file for %s" % \
1543                                        self.current_gateway)
1544           
1545                    gw_pubkey = "%s/keys/%s" % \
1546                            (self.tmpdir, self.gw_pubkey_base)
1547                    gw_secretkey = "%s/keys/%s" % \
1548                            (self.tmpdir, self.gw_secretkey_base)
1549
1550                    pkfile = "%s/%s/%s" % \
1551                            ( self.tmpdir, self.current_gateways, 
1552                                    self.gw_pubkey_base)
1553                    skfile = "%s/%s/%s" % \
1554                            ( self.tmpdir, self.current_gateways, 
1555                                    self.gw_secretkey_base)
1556
1557                    if not os.path.exists(pkfile):
1558                        try:
1559                            self.copy_file(gw_pubkey, pkfile)
1560                        except IOError:
1561                            service_error(service_error.internal,
1562                                    "Failed to copy pubkey file")
1563
1564                    if active and not os.path.exists(skfile):
1565                        try:
1566                            self.copy_file(gw_secretkey, skfile)
1567                        except IOError:
1568                            service_error(service_error.internal,
1569                                    "Failed to copy secretkey file")
1570                return True
1571
1572    class shunt_to_file:
1573        """
1574        Simple class to write data between two regexps to a file.
1575        """
1576        def __init__(self, begin, end, filename):
1577            """
1578            Begin shunting on a match of begin, stop on end, send data to
1579            filename.
1580            """
1581            self.begin = re.compile(begin)
1582            self.end = re.compile(end)
1583            self.in_shunt = False
1584            self.file = None
1585            self.filename = filename
1586
1587        def __call__(self, line):
1588            """
1589            Call this on each line in the input that may be shunted.
1590            """
1591            if not self.in_shunt:
1592                if self.begin.match(line):
1593                    self.in_shunt = True
1594                    try:
1595                        self.file = open(self.filename, "w")
1596                    except:
1597                        self.file = None
1598                        raise
1599                    return True
1600                else:
1601                    return False
1602            else:
1603                if self.end.match(line):
1604                    if self.file: 
1605                        self.file.close()
1606                        self.file = None
1607                    self.in_shunt = False
1608                else:
1609                    if self.file:
1610                        print >>self.file, line
1611                return True
1612
1613    class shunt_to_list:
1614        """
1615        Same interface as shunt_to_file.  Data collected in self.list, one list
1616        element per line.
1617        """
1618        def __init__(self, begin, end):
1619            self.begin = re.compile(begin)
1620            self.end = re.compile(end)
1621            self.in_shunt = False
1622            self.list = [ ]
1623       
1624        def __call__(self, line):
1625            if not self.in_shunt:
1626                if self.begin.match(line):
1627                    self.in_shunt = True
1628                    return True
1629                else:
1630                    return False
1631            else:
1632                if self.end.match(line):
1633                    self.in_shunt = False
1634                else:
1635                    self.list.append(line)
1636                return True
1637
1638    class shunt_to_string:
1639        """
1640        Same interface as shunt_to_file.  Data collected in self.str, all in
1641        one string.
1642        """
1643        def __init__(self, begin, end):
1644            self.begin = re.compile(begin)
1645            self.end = re.compile(end)
1646            self.in_shunt = False
1647            self.str = ""
1648       
1649        def __call__(self, line):
1650            if not self.in_shunt:
1651                if self.begin.match(line):
1652                    self.in_shunt = True
1653                    return True
1654                else:
1655                    return False
1656            else:
1657                if self.end.match(line):
1658                    self.in_shunt = False
1659                else:
1660                    self.str += line
1661                return True
1662
1663    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1664            tbparams, tmpdir, alloc_log=None):
1665        started = { }           # Testbeds where a sub-experiment started
1666                                # successfully
1667
1668        # XXX
1669        fail_soft = False
1670
1671        log = alloc_log or self.log
1672
1673        thread_pool = self.thread_pool(self.nthreads)
1674        threads = [ ]
1675
1676        for tb in [ k for k in allocated.keys() if k != master]:
1677            # Create and start a thread to start the segment, and save it to
1678            # get the return value later
1679            thread_pool.wait_for_slot()
1680            t  = self.pooled_thread(\
1681                    target=self.start_segment(log=log,
1682                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1683                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1684                    pdata=thread_pool, trace_file=self.trace_file)
1685            threads.append(t)
1686            t.start()
1687
1688        # Wait until all finish
1689        thread_pool.wait_for_all_done()
1690
1691        # If none failed, start the master
1692        failed = [ t.getName() for t in threads if not t.rv ]
1693
1694        if len(failed) == 0:
1695            starter = self.start_segment(log=log, 
1696                    keyfile=self.ssh_privkey_file, debug=self.debug)
1697            if not starter(master, eid, tbparams, tmpdir):
1698                failed.append(master)
1699
1700        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1701        # If one failed clean up, unless fail_soft is set
1702        if failed:
1703            if not fail_soft:
1704                thread_pool.clear()
1705                for tb in succeeded:
1706                    # Create and start a thread to stop the segment
1707                    thread_pool.wait_for_slot()
1708                    t  = self.pooled_thread(\
1709                            target=self.stop_segment(log=log,
1710                                keyfile=self.ssh_privkey_file,
1711                                debug=self.debug), 
1712                            args=(tb, eid, tbparams), name=tb,
1713                            pdata=thread_pool, trace_file=self.trace_file)
1714                    t.start()
1715                # Wait until all finish
1716                thread_pool.wait_for_all_done()
1717
1718                # release the allocations
1719                for tb in tbparams.keys():
1720                    self.release_access(tb, tbparams[tb]['allocID'])
1721                # Remove the placeholder
1722                self.state_lock.acquire()
1723                self.state[eid]['experimentStatus'] = 'failed'
1724                if self.state_filename: self.write_state()
1725                self.state_lock.release()
1726
1727                #raise service_error(service_error.federant,
1728                #    "Swap in failed on %s" % ",".join(failed))
1729                log.error("Swap in failed on %s" % ",".join(failed))
1730                return
1731        else:
1732            log.info("[start_segment]: Experiment %s active" % eid)
1733
1734        log.debug("[start_experiment]: removing %s" % tmpdir)
1735
1736        # Walk up tmpdir, deleting as we go
1737        for path, dirs, files in os.walk(tmpdir, topdown=False):
1738            for f in files:
1739                os.remove(os.path.join(path, f))
1740            for d in dirs:
1741                os.rmdir(os.path.join(path, d))
1742        os.rmdir(tmpdir)
1743
1744        # Insert the experiment into our state and update the disk copy
1745        self.state_lock.acquire()
1746        self.state[expid]['experimentStatus'] = 'active'
1747        self.state[eid] = self.state[expid]
1748        if self.state_filename: self.write_state()
1749        self.state_lock.release()
1750        return
1751
1752    def create_experiment(self, req, fid):
1753        """
1754        The external interface to experiment creation called from the
1755        dispatcher.
1756
1757        Creates a working directory, splits the incoming description using the
1758        splitter script and parses out the avrious subsections using the
1759        lcasses above.  Once each sub-experiment is created, use pooled threads
1760        to instantiate them and start it all up.
1761        """
1762
1763        if not self.auth.check_attribute(fid, 'create'):
1764            raise service_error(service_error.access, "Create access denied")
1765
1766        try:
1767            tmpdir = tempfile.mkdtemp(prefix="split-")
1768        except IOError:
1769            raise service_error(service_error.internal, "Cannot create tmp dir")
1770
1771        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1772        gw_secretkey_base = "fed.%s" % self.ssh_type
1773        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1774        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1775        tclfile = tmpdir + "/experiment.tcl"
1776        tbparams = { }
1777        try:
1778            access_user = self.accessdb[fid]
1779        except KeyError:
1780            raise service_error(service_error.internal,
1781                    "Access map and authorizer out of sync in " + \
1782                            "create_experiment for fedid %s"  % fid)
1783
1784        pid = "dummy"
1785        gid = "dummy"
1786        try:
1787            os.mkdir(tmpdir+"/keys")
1788        except OSError:
1789            raise service_error(service_error.internal,
1790                    "Can't make temporary dir")
1791
1792        req = req.get('CreateRequestBody', None)
1793        if not req:
1794            raise service_error(service_error.req,
1795                    "Bad request format (no CreateRequestBody)")
1796        # The tcl parser needs to read a file so put the content into that file
1797        descr=req.get('experimentdescription', None)
1798        if descr:
1799            file_content=descr.get('ns2description', None)
1800            if file_content:
1801                try:
1802                    f = open(tclfile, 'w')
1803                    f.write(file_content)
1804                    f.close()
1805                except IOError:
1806                    raise service_error(service_error.internal,
1807                            "Cannot write temp experiment description")
1808            else:
1809                raise service_error(service_error.req, 
1810                        "Only ns2descriptions supported")
1811        else:
1812            raise service_error(service_error.req, "No experiment description")
1813
1814        # Generate an ID for the experiment (slice) and a certificate that the
1815        # allocator can use to prove they own it.  We'll ship it back through
1816        # the encrypted connection.
1817        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1818
1819        if req.has_key('experimentID') and \
1820                req['experimentID'].has_key('localname'):
1821            eid = req['experimentID']['localname']
1822            self.state_lock.acquire()
1823            while (self.state.has_key(eid)):
1824                eid += random.choice(string.ascii_letters)
1825            # Initial state
1826            self.state[eid] = {
1827                    'experimentID' : \
1828                            [ { 'localname' : eid }, {'fedid': expid } ],
1829                    'experimentStatus': 'starting',
1830                    'experimentAccess': { 'X509' : expcert },
1831                    'owner': fid,
1832                    'log' : [],
1833                }
1834            self.state[expid] = self.state[eid]
1835            if self.state_filename: self.write_state()
1836            self.state_lock.release()
1837        else:
1838            eid = self.exp_stem
1839            for i in range(0,5):
1840                eid += random.choice(string.ascii_letters)
1841            self.state_lock.acquire()
1842            while (self.state.has_key(eid)):
1843                eid = self.exp_stem
1844                for i in range(0,5):
1845                    eid += random.choice(string.ascii_letters)
1846            # Initial state
1847            self.state[eid] = {
1848                    'experimentID' : \
1849                            [ { 'localname' : eid }, {'fedid': expid } ],
1850                    'experimentStatus': 'starting',
1851                    'experimentAccess': { 'X509' : expcert },
1852                    'owner': fid,
1853                    'log' : [],
1854                }
1855            self.state[expid] = self.state[eid]
1856            if self.state_filename: self.write_state()
1857            self.state_lock.release()
1858
1859        try: 
1860            # This catches exceptions to clear the placeholder if necessary
1861            try:
1862                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1863            except ValueError:
1864                raise service_error(service_error.server_config, 
1865                        "Bad key type (%s)" % self.ssh_type)
1866
1867            user = req.get('user', None)
1868            if user == None:
1869                raise service_error(service_error.req, "No user")
1870
1871            master = req.get('master', None)
1872            if not master:
1873                raise service_error(service_error.req,
1874                        "No master testbed label")
1875            export_project = req.get('exportProject', None)
1876            if not export_project:
1877                raise service_error(service_error.req, "No export project")
1878           
1879            if self.splitter_url:
1880                self.log.debug("Calling remote splitter at %s" % \
1881                        self.splitter_url)
1882                split_data = self.remote_splitter(self.splitter_url,
1883                        file_content, master)
1884            else:
1885                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1886                    str(self.muxmax), '-m', master]
1887
1888                if self.fedkit:
1889                    tclcmd.append('-k')
1890
1891                if self.gatewaykit:
1892                    tclcmd.append('-K')
1893
1894                tclcmd.extend([pid, gid, eid, tclfile])
1895
1896                self.log.debug("running local splitter %s", " ".join(tclcmd))
1897                tclparser = Popen(tclcmd, stdout=PIPE)
1898                split_data = tclparser.stdout
1899
1900            allocated = { }         # Testbeds we can access
1901            # Objects to parse the splitter output (defined above)
1902            parse_current_testbed = self.current_testbed(eid, tmpdir,
1903                    self.fedkit, self.gatewaykit)
1904            parse_allbeds = self.allbeds(self.get_access)
1905            parse_gateways = self.gateways(eid, master, tmpdir,
1906                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1907                    self.fedkit)
1908            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1909                        "^#\s+End\s+Vtopo")
1910            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1911                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1912            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1913                    "^#\s+End\s+tarfiles")
1914            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1915                    "^#\s+End\s+rpms")
1916
1917            # Working on the split data
1918            for line in split_data:
1919                line = line.rstrip()
1920                if parse_current_testbed(line, master, allocated, tbparams):
1921                    continue
1922                elif parse_allbeds(line, user, tbparams, master, export_project,
1923                        access_user):
1924                    continue
1925                elif parse_gateways(line, allocated, tbparams):
1926                    continue
1927                elif parse_vtopo(line):
1928                    continue
1929                elif parse_hostnames(line):
1930                    continue
1931                elif parse_tarfiles(line):
1932                    continue
1933                elif parse_rpms(line):
1934                    continue
1935                else:
1936                    raise service_error(service_error.internal, 
1937                            "Bad tcl parse? %s" % line)
1938            # Virtual topology and visualization
1939            vtopo = self.gentopo(parse_vtopo.str)
1940            if not vtopo:
1941                raise service_error(service_error.internal, 
1942                        "Failed to generate virtual topology")
1943
1944            vis = self.genviz(vtopo)
1945            if not vis:
1946                raise service_error(service_error.internal, 
1947                        "Failed to generate visualization")
1948
1949           
1950            # save federant information
1951            for k in allocated.keys():
1952                tbparams[k]['federant'] = {\
1953                        'name': [ { 'localname' : eid} ],\
1954                        'emulab': tbparams[k]['emulab'],\
1955                        'allocID' : tbparams[k]['allocID'],\
1956                        'master' : k == master,\
1957                    }
1958
1959            self.state_lock.acquire()
1960            self.state[eid]['vtopo'] = vtopo
1961            self.state[eid]['vis'] = vis
1962            self.state[expid]['federant'] = \
1963                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1964                        if tbparams[tb].has_key('federant') ]
1965            if self.state_filename: self.write_state()
1966            self.state_lock.release()
1967
1968            # Copy tarfiles and rpms needed at remote sites into a staging area
1969            try:
1970                if self.fedkit:
1971                    for t in self.fedkit:
1972                        parse_tarfiles.list.append(t[1])
1973                if self.gatewaykit:
1974                    for t in self.gatewaykit:
1975                        parse_tarfiles.list.append(t[1])
1976                for t in parse_tarfiles.list:
1977                    if not os.path.exists("%s/tarfiles" % tmpdir):
1978                        os.mkdir("%s/tarfiles" % tmpdir)
1979                    self.copy_file(t, "%s/tarfiles/%s" % \
1980                            (tmpdir, os.path.basename(t)))
1981                for r in parse_rpms.list:
1982                    if not os.path.exists("%s/rpms" % tmpdir):
1983                        os.mkdir("%s/rpms" % tmpdir)
1984                    self.copy_file(r, "%s/rpms/%s" % \
1985                            (tmpdir, os.path.basename(r)))
1986                # A null experiment file in case we need to create a remote
1987                # experiment from scratch
1988                f = open("%s/null.tcl" % tmpdir, "w")
1989                print >>f, """
1990set ns [new Simulator]
1991source tb_compat.tcl
1992
1993set a [$ns node]
1994
1995$ns rtproto Session
1996$ns run
1997"""
1998                f.close()
1999
2000            except IOError, e:
2001                raise service_error(service_error.internal, 
2002                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2003
2004        except service_error, e:
2005            # If something goes wrong in the parse (usually an access error)
2006            # clear the placeholder state.  From here on out the code delays
2007            # exceptions.  Failing at this point returns a fault to the remote
2008            # caller.
2009            self.state_lock.acquire()
2010            del self.state[eid]
2011            del self.state[expid]
2012            if self.state_filename: self.write_state()
2013            self.state_lock.release()
2014            raise e
2015
2016
2017        # Start the background swapper and return the starting state.  From
2018        # here on out, the state will stick around a while.
2019
2020        # Let users touch the state
2021        self.auth.set_attribute(fid, expid)
2022        self.auth.set_attribute(expid, expid)
2023        # Override fedids can manipulate state as well
2024        for o in self.overrides:
2025            self.auth.set_attribute(o, expid)
2026
2027        # Create a logger that logs to the experiment's state object as well as
2028        # to the main log file.
2029
2030        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2031        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2032        # XXX: there should be a global one of these rather than repeating the
2033        # code.
2034        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2035                    '%d %b %y %H:%M:%S'))
2036        alloc_log.addHandler(h)
2037       
2038
2039
2040
2041
2042        # Start a thread to do the resource allocation
2043        t  = Thread(target=self.allocate_resources,
2044                args=(allocated, master, eid, expid, expcert, tbparams, 
2045                    tmpdir, alloc_log),
2046                name=eid)
2047        t.start()
2048
2049        rv = {
2050                'experimentID': [
2051                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2052                ],
2053                'experimentStatus': 'starting',
2054                'experimentAccess': { 'X509' : expcert }
2055            }
2056
2057        return rv
2058
2059    def check_experiment_access(self, fid, key):
2060        """
2061        Confirm that the fid has access to the experiment.  Though a request
2062        may be made in terms of a local name, the access attribute is always
2063        the experiment's fedid.
2064        """
2065        if not isinstance(key, fedid):
2066            self.state_lock.acquire()
2067            if self.state.has_key(key):
2068                if isinstance(self.state[key], dict):
2069                    try:
2070                        kl = [ f['fedid'] for f in \
2071                                self.state[key]['experimentID']\
2072                                    if f.has_key('fedid') ]
2073                    except KeyError:
2074                        self.state_lock.release()
2075                        raise service_error(service_error.internal, 
2076                                "No fedid for experiment %s when checking " +\
2077                                        "access(!?)" % key)
2078                    if len(kl) == 1:
2079                        key = kl[0]
2080                    else:
2081                        self.state_lock.release()
2082                        raise service_error(service_error.internal, 
2083                                "multiple fedids for experiment %s when " +\
2084                                        "checking access(!?)" % key)
2085                elif isinstance(self.state[key], str):
2086                    self.state_lock.release()
2087                    raise service_error(service_error.internal, 
2088                            ("experiment %s is placeholder.  " +\
2089                                    "Creation in progress or aborted oddly") \
2090                                    % key)
2091                else:
2092                    self.state_lock.release()
2093                    raise service_error(service_error.internal, 
2094                            "Unexpected state for %s" % key)
2095
2096            else:
2097                self.state_lock.release()
2098                raise service_error(service_error.access, "Access Denied")
2099            self.state_lock.release()
2100
2101        if self.auth.check_attribute(fid, key):
2102            return True
2103        else:
2104            raise service_error(service_error.access, "Access Denied")
2105
2106
2107
2108    def get_vtopo(self, req, fid):
2109        """
2110        Return the stored virtual topology for this experiment
2111        """
2112        rv = None
2113        state = None
2114
2115        req = req.get('VtopoRequestBody', None)
2116        if not req:
2117            raise service_error(service_error.req,
2118                    "Bad request format (no VtopoRequestBody)")
2119        exp = req.get('experiment', None)
2120        if exp:
2121            if exp.has_key('fedid'):
2122                key = exp['fedid']
2123                keytype = "fedid"
2124            elif exp.has_key('localname'):
2125                key = exp['localname']
2126                keytype = "localname"
2127            else:
2128                raise service_error(service_error.req, "Unknown lookup type")
2129        else:
2130            raise service_error(service_error.req, "No request?")
2131
2132        self.check_experiment_access(fid, key)
2133
2134        self.state_lock.acquire()
2135        if self.state.has_key(key):
2136            if self.state[key].has_key('vtopo'):
2137                rv = { 'experiment' : {keytype: key },\
2138                        'vtopo': self.state[key]['vtopo'],\
2139                    }
2140            else:
2141                state = self.state[key]['experimentStatus']
2142        self.state_lock.release()
2143
2144        if rv: return rv
2145        else: 
2146            if state:
2147                raise service_error(service_error.partial, 
2148                        "Not ready: %s" % state)
2149            else:
2150                raise service_error(service_error.req, "No such experiment")
2151
2152    def get_vis(self, req, fid):
2153        """
2154        Return the stored visualization for this experiment
2155        """
2156        rv = None
2157        state = None
2158
2159        req = req.get('VisRequestBody', None)
2160        if not req:
2161            raise service_error(service_error.req,
2162                    "Bad request format (no VisRequestBody)")
2163        exp = req.get('experiment', None)
2164        if exp:
2165            if exp.has_key('fedid'):
2166                key = exp['fedid']
2167                keytype = "fedid"
2168            elif exp.has_key('localname'):
2169                key = exp['localname']
2170                keytype = "localname"
2171            else:
2172                raise service_error(service_error.req, "Unknown lookup type")
2173        else:
2174            raise service_error(service_error.req, "No request?")
2175
2176        self.check_experiment_access(fid, key)
2177
2178        self.state_lock.acquire()
2179        if self.state.has_key(key):
2180            if self.state[key].has_key('vis'):
2181                rv =  { 'experiment' : {keytype: key },\
2182                        'vis': self.state[key]['vis'],\
2183                        }
2184            else:
2185                state = self.state[key]['experimentStatus']
2186        self.state_lock.release()
2187
2188        if rv: return rv
2189        else:
2190            if state:
2191                raise service_error(service_error.partial, 
2192                        "Not ready: %s" % state)
2193            else:
2194                raise service_error(service_error.req, "No such experiment")
2195
2196    def clean_info_response(self, rv):
2197        """
2198        Remove the information in the experiment's state object that is not in
2199        the info response.
2200        """
2201        # Remove the owner info (should always be there, but...)
2202        if rv.has_key('owner'): del rv['owner']
2203
2204        # Convert the log into the allocationLog parameter and remove the
2205        # log entry (with defensive programming)
2206        if rv.has_key('log'):
2207            rv['allocationLog'] = "".join(rv['log'])
2208            del rv['log']
2209        else:
2210            rv['allocationLog'] = ""
2211
2212        if rv['experimentStatus'] != 'active':
2213            if rv.has_key('federant'): del rv['federant']
2214        else:
2215            # remove the allocationID info from each federant
2216            for f in rv.get('federant', []):
2217                if f.has_key('allocID'): del f['allocID']
2218
2219        return rv
2220
2221    def get_info(self, req, fid):
2222        """
2223        Return all the stored info about this experiment
2224        """
2225        rv = None
2226
2227        req = req.get('InfoRequestBody', None)
2228        if not req:
2229            raise service_error(service_error.req,
2230                    "Bad request format (no InfoRequestBody)")
2231        exp = req.get('experiment', None)
2232        if exp:
2233            if exp.has_key('fedid'):
2234                key = exp['fedid']
2235                keytype = "fedid"
2236            elif exp.has_key('localname'):
2237                key = exp['localname']
2238                keytype = "localname"
2239            else:
2240                raise service_error(service_error.req, "Unknown lookup type")
2241        else:
2242            raise service_error(service_error.req, "No request?")
2243
2244        self.check_experiment_access(fid, key)
2245
2246        # The state may be massaged by the service function that called
2247        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2248        # state.
2249        self.state_lock.acquire()
2250        if self.state.has_key(key):
2251            rv = copy.deepcopy(self.state[key])
2252        self.state_lock.release()
2253
2254        if rv:
2255            return self.clean_info_response(rv)
2256        else:
2257            raise service_error(service_error.req, "No such experiment")
2258
2259    def get_multi_info(self, req, fid):
2260        """
2261        Return all the stored info that this fedid can access
2262        """
2263        rv = { 'info': [ ] }
2264
2265        self.state_lock.acquire()
2266        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2267            self.check_experiment_access(fid, key)
2268
2269            if self.state.has_key(key):
2270                e = copy.deepcopy(self.state[key])
2271                e = self.clean_info_response(e)
2272                rv['info'].append(e)
2273        self.state_lock.release()
2274        return rv
2275
2276
2277    def terminate_experiment(self, req, fid):
2278        """
2279        Swap this experiment out on the federants and delete the shared
2280        information
2281        """
2282        tbparams = { }
2283        req = req.get('TerminateRequestBody', None)
2284        if not req:
2285            raise service_error(service_error.req,
2286                    "Bad request format (no TerminateRequestBody)")
2287        force = req.get('force', False)
2288        exp = req.get('experiment', None)
2289        if exp:
2290            if exp.has_key('fedid'):
2291                key = exp['fedid']
2292                keytype = "fedid"
2293            elif exp.has_key('localname'):
2294                key = exp['localname']
2295                keytype = "localname"
2296            else:
2297                raise service_error(service_error.req, "Unknown lookup type")
2298        else:
2299            raise service_error(service_error.req, "No request?")
2300
2301        self.check_experiment_access(fid, key)
2302
2303        self.state_lock.acquire()
2304        fed_exp = self.state.get(key, None)
2305
2306        if fed_exp:
2307            # This branch of the conditional holds the lock to generate a
2308            # consistent temporary tbparams variable to deallocate experiments.
2309            # It releases the lock to do the deallocations and reacquires it to
2310            # remove the experiment state when the termination is complete.
2311
2312            # First make sure that the experiment creation is complete.
2313            status = fed_exp.get('experimentStatus', None)
2314            if status:
2315                if status == 'starting':
2316                    if not force:
2317                        self.state_lock.release()
2318                        raise service_error(service_error.partial, 
2319                                'Experiment still being created')
2320                    else:
2321                        self.log.warning('Experiment in starting state ' + \
2322                                'being terminated by admin.')
2323            else:
2324                # No status??? trouble
2325                self.state_lock.release()
2326                raise service_error(service_error.internal,
2327                        "Experiment has no status!?")
2328
2329            ids = []
2330            #  experimentID is a list of dicts that are self-describing
2331            #  identifiers.  This finds all the fedids and localnames - the
2332            #  keys of self.state - and puts them into ids.
2333            for id in fed_exp.get('experimentID', []):
2334                if id.has_key('fedid'): ids.append(id['fedid'])
2335                if id.has_key('localname'): ids.append(id['localname'])
2336
2337            # Construct enough of the tbparams to make the stop_segment calls
2338            # work
2339            for fed in fed_exp.get('federant', []):
2340                try:
2341                    for e in fed['name']:
2342                        eid = e.get('localname', None)
2343                        if eid: break
2344                    else:
2345                        continue
2346
2347                    p = fed['emulab']['project']
2348
2349                    project = p['name']['localname']
2350                    tb = p['testbed']['localname']
2351                    user = p['user'][0]['userID']['localname']
2352
2353                    domain = fed['emulab']['domain']
2354                    host  = fed['emulab']['ops']
2355                    aid = fed['allocID']
2356                except KeyError, e:
2357                    continue
2358                tbparams[tb] = {\
2359                        'user': user,\
2360                        'domain': domain,\
2361                        'project': project,\
2362                        'host': host,\
2363                        'eid': eid,\
2364                        'aid': aid,\
2365                    }
2366            self.state_lock.release()
2367
2368            # Stop everyone.
2369            thread_pool = self.thread_pool(self.nthreads)
2370            for tb in tbparams.keys():
2371                # Create and start a thread to stop the segment
2372                thread_pool.wait_for_slot()
2373                t  = self.pooled_thread(\
2374                        target=self.stop_segment(log=self.log,
2375                            keyfile=self.ssh_privkey_file, debug=self.debug), 
2376                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2377                        pdata=thread_pool, trace_file=self.trace_file)
2378                t.start()
2379            # Wait for completions
2380            thread_pool.wait_for_all_done()
2381
2382            # release the allocations (failed experiments have done this
2383            # already, and starting experiments may be in odd states, so we
2384            # ignore errors releasing those allocations
2385            try: 
2386                for tb in tbparams.keys():
2387                    self.release_access(tb, tbparams[tb]['aid'])
2388            except service_error, e:
2389                if status != 'failed' and not force:
2390                    raise e
2391
2392            # Remove the terminated experiment
2393            self.state_lock.acquire()
2394            for id in ids:
2395                if self.state.has_key(id): del self.state[id]
2396
2397            if self.state_filename: self.write_state()
2398            self.state_lock.release()
2399
2400            return { 'experiment': exp }
2401        else:
2402            # Don't forget to release the lock
2403            self.state_lock.release()
2404            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.