source: fedd/federation/experiment_control.py @ 4ea1e22

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 4ea1e22 was 4ea1e22, checked in by Ted Faber <faber@…>, 15 years ago

make sure that subprocesses aren\'t holding the server sockets open.

  • Property mode set to 100644
File size: 89.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = Lock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
70   
71    class thread_pool:
72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
148
149    class pooled_thread(Thread):
150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
188
189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
192
193    def __init__(self, config=None, auth=None):
194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
214        self.list_log = experiment_control_local.list_log
215
216        self.cert_file = config.get("experiment_control", "cert_file")
217        if self.cert_file:
218            self.cert_pwd = config.get("experiment_control", "cert_pwd")
219        else:
220            self.cert_file = config.get("globals", "cert_file")
221            self.cert_pwd = config.get("globals", "cert_pwd")
222
223        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
224                or config.get("globals", "trusted_certs")
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 2
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'Create': soap_handler('Create', self.create_experiment),
337                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
338                'Vis': soap_handler('Vis', self.get_vis),
339                'Info': soap_handler('Info', self.get_info),
340                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
341                'Terminate': soap_handler('Terminate', 
342                    self.terminate_experiment),
343        }
344
345        self.xmlrpc_services = {\
346                'Create': xmlrpc_handler('Create', self.create_experiment),
347                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
348                'Vis': xmlrpc_handler('Vis', self.get_vis),
349                'Info': xmlrpc_handler('Info', self.get_info),
350                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
351                'Terminate': xmlrpc_handler('Terminate',
352                    self.terminate_experiment),
353        }
354
355    def copy_file(self, src, dest, size=1024):
356        """
357        Exceedingly simple file copy.
358        """
359        s = open(src,'r')
360        d = open(dest, 'w')
361
362        buf = "x"
363        while buf != "":
364            buf = s.read(size)
365            d.write(buf)
366        s.close()
367        d.close()
368
369    # Call while holding self.state_lock
370    def write_state(self):
371        """
372        Write a new copy of experiment state after copying the existing state
373        to a backup.
374
375        State format is a simple pickling of the state dictionary.
376        """
377        if os.access(self.state_filename, os.W_OK):
378            self.copy_file(self.state_filename, \
379                    "%s.bak" % self.state_filename)
380        try:
381            f = open(self.state_filename, 'w')
382            pickle.dump(self.state, f)
383        except IOError, e:
384            self.log.error("Can't write file %s: %s" % \
385                    (self.state_filename, e))
386        except pickle.PicklingError, e:
387            self.log.error("Pickling problem: %s" % e)
388        except TypeError, e:
389            self.log.error("Pickling problem (TypeError): %s" % e)
390
391    # Call while holding self.state_lock
392    def read_state(self):
393        """
394        Read a new copy of experiment state.  Old state is overwritten.
395
396        State format is a simple pickling of the state dictionary.
397        """
398        try:
399            f = open(self.state_filename, "r")
400            self.state = pickle.load(f)
401            self.log.debug("[read_state]: Read state from %s" % \
402                    self.state_filename)
403        except IOError, e:
404            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
405                    % (self.state_filename, e))
406        except pickle.UnpicklingError, e:
407            self.log.warning(("[read_state]: No saved state: " + \
408                    "Unpickling failed: %s") % e)
409       
410        for k in self.state.keys():
411            try:
412                # This list should only have one element in it, but phrasing it
413                # as a for loop doesn't cost much, really.  We have to find the
414                # fedid elements anyway.
415                for eid in [ f['fedid'] \
416                        for f in self.state[k]['experimentID']\
417                            if f.has_key('fedid') ]:
418                    self.auth.set_attribute(self.state[k]['owner'], eid)
419                    # allow overrides to control experiments as well
420                    for o in self.overrides:
421                        self.auth.set_attribute(o, eid)
422            except KeyError, e:
423                self.log.warning("[read_state]: State ownership or identity " +\
424                        "misformatted in %s: %s" % (self.state_filename, e))
425
426
427    def read_accessdb(self, accessdb_file):
428        """
429        Read the mapping from fedids that can create experiments to their name
430        in the 3-level access namespace.  All will be asserted from this
431        testbed and can include the local username and porject that will be
432        asserted on their behalf by this fedd.  Each fedid is also added to the
433        authorization system with the "create" attribute.
434        """
435        self.accessdb = {}
436        # These are the regexps for parsing the db
437        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
438        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
439                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
440        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
441                "\s*->\s*(" + name_expr + ")\s*$")
442        lineno = 0
443
444        # Parse the mappings and store in self.authdb, a dict of
445        # fedid -> (proj, user)
446        try:
447            f = open(accessdb_file, "r")
448            for line in f:
449                lineno += 1
450                line = line.strip()
451                if len(line) == 0 or line.startswith('#'):
452                    continue
453                m = project_line.match(line)
454                if m:
455                    fid = fedid(hexstr=m.group(1))
456                    project, user = m.group(2,3)
457                    if not self.accessdb.has_key(fid):
458                        self.accessdb[fid] = []
459                    self.accessdb[fid].append((project, user))
460                    continue
461
462                m = user_line.match(line)
463                if m:
464                    fid = fedid(hexstr=m.group(1))
465                    project = None
466                    user = m.group(2)
467                    if not self.accessdb.has_key(fid):
468                        self.accessdb[fid] = []
469                    self.accessdb[fid].append((project, user))
470                    continue
471                self.log.warn("[experiment_control] Error parsing access " +\
472                        "db %s at line %d" %  (accessdb_file, lineno))
473        except IOError:
474            raise service_error(service_error.internal,
475                    "Error opening/reading %s as experiment " +\
476                            "control accessdb" %  accessdb_file)
477        f.close()
478
479        # Initialize the authorization attributes
480        for fid in self.accessdb.keys():
481            self.auth.set_attribute(fid, 'create')
482
483    def read_mapdb(self, file):
484        """
485        Read a simple colon separated list of mappings for the
486        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
487        """
488
489        self.tbmap = { }
490        lineno =0
491        try:
492            f = open(file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if line.startswith('#') or len(line) == 0:
497                    continue
498                try:
499                    label, url = line.split(':', 1)
500                    self.tbmap[label] = url
501                except ValueError, e:
502                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
503                            "map db: %s %s" % (lineno, line, e))
504        except IOError, e:
505            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
506                    "open %s: %s" % (file, e))
507        f.close()
508
509    class emulab_segment:
510        def __init__(self, log=None, keyfile=None, debug=False):
511            self.log = log or logging.getLogger(\
512                    'fedd.experiment_control.emulab_segment')
513            self.ssh_privkey_file = keyfile
514            self.debug = debug
515            self.ssh_exec="/usr/bin/ssh"
516            self.scp_exec = "/usr/bin/scp"
517            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
518
519        def scp_file(self, file, user, host, dest=""):
520            """
521            scp a file to the remote host.  If debug is set the action is only
522            logged.
523            """
524
525            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
526                    '-o', 'StrictHostKeyChecking yes', '-i', 
527                    self.ssh_privkey_file, file, 
528                    "%s@%s:%s" % (user, host, dest)]
529            rv = 0
530
531            try:
532                dnull = open("/dev/null", "w")
533            except IOError:
534                self.log.debug("[ssh_file]: failed to open " + \
535                        "/dev/null for redirect")
536                dnull = Null
537
538            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
539            if not self.debug:
540                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
541                        close_fds=True)
542
543            return rv == 0
544
545        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
546            """
547            Run a remote command on host as user.  If debug is set, the action
548            is only logged.  Commands are run without stdin, to avoid stray
549            SIGTTINs.
550            """
551            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
552                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
553                    (self.ssh_exec, self.ssh_privkey_file, 
554                            user, host, cmd)
555
556            try:
557                dnull = open("/dev/null", "r")
558            except IOError:
559                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
560                        "for redirect")
561                dnull = Null
562
563            self.log.debug("[ssh_cmd]: %s" % sh_str)
564            if not self.debug:
565                if dnull:
566                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
567                            close_fds=True)
568                else:
569                    sub = Popen(sh_str, shell=True,
570                            close_fds=True)
571                if timeout:
572                    i = 0
573                    rv = sub.poll()
574                    while i < timeout:
575                        if rv is not None: break
576                        else:
577                            time.sleep(1)
578                            rv = sub.poll()
579                            i += 1
580                    else:
581                        self.log.debug("Process exceeded runtime: %s" % sh_str)
582                        os.kill(sub.pid, signal.SIGKILL)
583                        raise self.ssh_cmd_timeout();
584                    return rv == 0
585                else:
586                    return sub.wait() == 0
587            else:
588                return True
589
590    class start_segment(emulab_segment):
591        def __init__(self, log=None, keyfile=None, debug=False):
592            experiment_control_local.emulab_segment.__init__(self,
593                    log=log, keyfile=keyfile, debug=debug)
594
595        def create_config_tree(self, src_dir, dest_dir, script):
596            """
597            Append commands to script that will create the directory hierarchy
598            on the remote federant.
599            """
600
601            if os.path.isdir(src_dir):
602                print >>script, "mkdir -p %s" % dest_dir
603                print >>script, "chmod 770 %s" % dest_dir
604
605                for f in os.listdir(src_dir):
606                    if os.path.isdir(f):
607                        self.create_config_tree("%s/%s" % (src_dir, f), 
608                                "%s/%s" % (dest_dir, f), script)
609            else:
610                self.log.debug("[create_config_tree]: Not a directory: %s" \
611                        % src_dir)
612
613        def ship_configs(self, host, user, src_dir, dest_dir):
614            """
615            Copy federant-specific configuration files to the federant.
616            """
617            for f in os.listdir(src_dir):
618                if os.path.isdir(f):
619                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
620                            "%s/%s" % (dest_dir, f)):
621                        return False
622                else:
623                    if not self.scp_file("%s/%s" % (src_dir, f), 
624                            user, host, dest_dir):
625                        return False
626            return True
627
628        def get_state(self, user, host, tb, pid, eid):
629            # command to test experiment state
630            expinfo_exec = "/usr/testbed/bin/expinfo" 
631            # Regular expressions to parse the expinfo response
632            state_re = re.compile("State:\s+(\w+)")
633            no_exp_re = re.compile("^No\s+such\s+experiment")
634            swapping_re = re.compile("^No\s+information\s+available.")
635            state = None    # Experiment state parsed from expinfo
636            # The expinfo ssh command.  Note the identity restriction to use
637            # only the identity provided in the pubkey given.
638            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
639                    'StrictHostKeyChecking yes', '-i', 
640                    self.ssh_privkey_file, "%s@%s" % (user, host), 
641                    expinfo_exec, pid, eid]
642
643            dev_null = None
644            try:
645                dev_null = open("/dev/null", "a")
646            except IOError, e:
647                self.log.error("[get_state]: can't open /dev/null: %s" %e)
648
649            if self.debug:
650                state = 'swapped'
651                rv = 0
652            else:
653                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
654                        close_fds=True)
655                for line in status.stdout:
656                    m = state_re.match(line)
657                    if m: state = m.group(1)
658                    else:
659                        for reg, st in ((no_exp_re, "none"),
660                                (swapping_re, "swapping")):
661                            m = reg.match(line)
662                            if m: state = st
663                rv = status.wait()
664
665            # If the experiment is not present the subcommand returns a
666            # non-zero return value.  If we successfully parsed a "none"
667            # outcome, ignore the return code.
668            if rv != 0 and state != 'none':
669                raise service_error(service_error.internal,
670                        "Cannot get status of segment %s:%s/%s" % \
671                                (tb, pid, eid))
672            elif state not in ('active', 'swapped', 'swapping', 'none'):
673                raise service_error(service_error.internal,
674                        "Cannot get status of segment %s:%s/%s" % \
675                                (tb, pid, eid))
676            else: return state
677
678
679        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
680            """
681            Start a sub-experiment on a federant.
682
683            Get the current state, modify or create as appropriate, ship data
684            and configs and start the experiment.  There are small ordering
685            differences based on the initial state of the sub-experiment.
686            """
687            # ops node in the federant
688            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
689            user = tbparams[tb]['user']     # federant user
690            pid = tbparams[tb]['project']   # federant project
691            # XXX
692            base_confs = ( "hosts",)
693            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
694            # Configuration directories on the remote machine
695            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
696            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
697            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
698
699            state = self.get_state(user, host, tb, pid, eid)
700
701            self.log.debug("[start_segment]: %s: %s" % (tb, state))
702            self.log.info("[start_segment]:transferring experiment to %s" % tb)
703
704            if not self.scp_file("%s/%s/%s" % \
705                    (tmpdir, tb, tclfile), user, host):
706                return False
707           
708            if state == 'none':
709                # Create a null copy of the experiment so that we capture any
710                # logs there if the modify fails.  Emulab software discards the
711                # logs from a failed startexp
712                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
713                    return False
714                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
715                timedout = False
716                try:
717                    if not self.ssh_cmd(user, host,
718                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
719                            "-e %s null.tcl") % (pid, eid), "startexp",
720                            timeout=60 * 10):
721                        return False
722                except self.ssh_cmd_timeout:
723                    timedout = True
724
725                if timedout:
726                    state = self.get_state(user, host, self.ssh_privkey_file, 
727                            tb, eid, pid)
728                    if state != "swapped":
729                        return False
730
731           
732            # Open up a temporary file to contain a script for setting up the
733            # filespace for the new experiment.
734            self.log.info("[start_segment]: creating script file")
735            try:
736                sf, scriptname = tempfile.mkstemp()
737                scriptfile = os.fdopen(sf, 'w')
738            except IOError:
739                return False
740
741            scriptbase = os.path.basename(scriptname)
742
743            # Script the filesystem changes
744            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
745            # Clear and create the tarfiles and rpm directories
746            for d in (tarfiles_dir, rpms_dir):
747                print >>scriptfile, "/bin/rm -rf %s/*" % d
748                print >>scriptfile, "mkdir -p %s" % d
749            print >>scriptfile, 'mkdir -p %s' % proj_dir
750            self.create_config_tree("%s/%s" % (tmpdir, tb),
751                    proj_dir, scriptfile)
752            if os.path.isdir("%s/tarfiles" % tmpdir):
753                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
754                        scriptfile)
755            if os.path.isdir("%s/rpms" % tmpdir):
756                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
757                        scriptfile)
758            print >>scriptfile, "rm -f %s" % scriptbase
759            scriptfile.close()
760
761            # Move the script to the remote machine
762            # XXX: could collide tempfile names on the remote host
763            if self.scp_file(scriptname, user, host, scriptbase):
764                os.remove(scriptname)
765            else:
766                return False
767
768            # Execute the script (and the script's last line deletes it)
769            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
770                return False
771
772            for f in base_confs:
773                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
774                        "%s/%s" % (proj_dir, f)):
775                    return False
776            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
777                    proj_dir):
778                return False
779            if os.path.isdir("%s/tarfiles" % tmpdir):
780                if not self.ship_configs(host, user,
781                        "%s/tarfiles" % tmpdir, tarfiles_dir):
782                    return False
783            if os.path.isdir("%s/rpms" % tmpdir):
784                if not self.ship_configs(host, user,
785                        "%s/rpms" % tmpdir, tarfiles_dir):
786                    return False
787            # Stage the new configuration (active experiments will stay swapped
788            # in now)
789            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
790            try:
791                if not self.ssh_cmd(user, host,
792                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
793                                (pid, eid, tclfile),
794                        "modexp", timeout= 60 * 10):
795                    return False
796            except self.ssh_cmd_timeout:
797                print "modexp timeout"
798                # There's really no way to see if this succeeded or failed, so
799                # if it hangs, assume the worst.
800                return False
801            # Active experiments are still swapped, this swaps the others in.
802            if state != 'active':
803                self.log.info("[start_segment]: Swapping %s in on %s" % \
804                        (eid, tb))
805                timedout = False
806                try:
807                    if not self.ssh_cmd(user, host,
808                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
809                            "swapexp", timeout=10*60):
810                        return False
811                except self.ssh_cmd_timeout:
812                    timedout = True
813               
814                # If the command was terminated, but completed successfully,
815                # report success.
816                if timedout:
817                    state = self.get_state(user, host, self.ssh_privkey_file,
818                            tb, eid, pid)
819                    self.log.debug("[start_segment]: swapin timed out (state)")
820                    return state == 'active'
821            # Everything has gone OK.
822            return True
823
824    class stop_segment(emulab_segment):
825        def __init__(self, log=None, keyfile=None, debug=False):
826            experiment_control_local.emulab_segment.__init__(self,
827                    log=log, keyfile=keyfile, debug=debug)
828
829        def __call__(self, tb, eid, tbparams):
830            """
831            Stop a sub experiment by calling swapexp on the federant
832            """
833            user = tbparams[tb]['user']
834            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
835            pid = tbparams[tb]['project']
836
837            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
838            return self.ssh_cmd(user, host,
839                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
840
841       
842    def generate_ssh_keys(self, dest, type="rsa" ):
843        """
844        Generate a set of keys for the gateways to use to talk.
845
846        Keys are of type type and are stored in the required dest file.
847        """
848        valid_types = ("rsa", "dsa")
849        t = type.lower();
850        if t not in valid_types: raise ValueError
851        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
852
853        try:
854            trace = open("/dev/null", "w")
855        except IOError:
856            raise service_error(service_error.internal,
857                    "Cannot open /dev/null??");
858
859        # May raise CalledProcessError
860        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
861        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
862        if rv != 0:
863            raise service_error(service_error.internal, 
864                    "Cannot generate nonce ssh keys.  %s return code %d" \
865                            % (self.ssh_keygen, rv))
866
867    def gentopo(self, str):
868        """
869        Generate the topology dtat structure from the splitter's XML
870        representation of it.
871
872        The topology XML looks like:
873            <experiment>
874                <nodes>
875                    <node><vname></vname><ips>ip1:ip2</ips></node>
876                </nodes>
877                <lans>
878                    <lan>
879                        <vname></vname><vnode></vnode><ip></ip>
880                        <bandwidth></bandwidth><member>node:port</member>
881                    </lan>
882                </lans>
883        """
884        class topo_parse:
885            """
886            Parse the topology XML and create the dats structure.
887            """
888            def __init__(self):
889                # Typing of the subelements for data conversion
890                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
891                self.int_subelements = ( 'bandwidth',)
892                self.float_subelements = ( 'delay',)
893                # The final data structure
894                self.nodes = [ ]
895                self.lans =  [ ]
896                self.topo = { \
897                        'node': self.nodes,\
898                        'lan' : self.lans,\
899                    }
900                self.element = { }  # Current element being created
901                self.chars = ""     # Last text seen
902
903            def end_element(self, name):
904                # After each sub element the contents is added to the current
905                # element or to the appropriate list.
906                if name == 'node':
907                    self.nodes.append(self.element)
908                    self.element = { }
909                elif name == 'lan':
910                    self.lans.append(self.element)
911                    self.element = { }
912                elif name in self.str_subelements:
913                    self.element[name] = self.chars
914                    self.chars = ""
915                elif name in self.int_subelements:
916                    self.element[name] = int(self.chars)
917                    self.chars = ""
918                elif name in self.float_subelements:
919                    self.element[name] = float(self.chars)
920                    self.chars = ""
921
922            def found_chars(self, data):
923                self.chars += data.rstrip()
924
925
926        tp = topo_parse();
927        parser = xml.parsers.expat.ParserCreate()
928        parser.EndElementHandler = tp.end_element
929        parser.CharacterDataHandler = tp.found_chars
930
931        parser.Parse(str)
932
933        return tp.topo
934       
935
936    def genviz(self, topo):
937        """
938        Generate the visualization the virtual topology
939        """
940
941        neato = "/usr/local/bin/neato"
942        # These are used to parse neato output and to create the visualization
943        # file.
944        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
945        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
946                "%s</type></node>"
947
948        try:
949            # Node names
950            nodes = [ n['vname'] for n in topo['node'] ]
951            topo_lans = topo['lan']
952        except KeyError:
953            raise service_error(service_error.internal, "Bad topology")
954
955        lans = { }
956        links = { }
957
958        # Walk through the virtual topology, organizing the connections into
959        # 2-node connections (links) and more-than-2-node connections (lans).
960        # When a lan is created, it's added to the list of nodes (there's a
961        # node in the visualization for the lan).
962        for l in topo_lans:
963            if links.has_key(l['vname']):
964                if len(links[l['vname']]) < 2:
965                    links[l['vname']].append(l['vnode'])
966                else:
967                    nodes.append(l['vname'])
968                    lans[l['vname']] = links[l['vname']]
969                    del links[l['vname']]
970                    lans[l['vname']].append(l['vnode'])
971            elif lans.has_key(l['vname']):
972                lans[l['vname']].append(l['vnode'])
973            else:
974                links[l['vname']] = [ l['vnode'] ]
975
976
977        # Open up a temporary file for dot to turn into a visualization
978        try:
979            df, dotname = tempfile.mkstemp()
980            dotfile = os.fdopen(df, 'w')
981        except IOError:
982            raise service_error(service_error.internal,
983                    "Failed to open file in genviz")
984
985        # Generate a dot/neato input file from the links, nodes and lans
986        try:
987            print >>dotfile, "graph G {"
988            for n in nodes:
989                print >>dotfile, '\t"%s"' % n
990            for l in links.keys():
991                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
992            for l in lans.keys():
993                for n in lans[l]:
994                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
995            print >>dotfile, "}"
996            dotfile.close()
997        except TypeError:
998            raise service_error(service_error.internal,
999                    "Single endpoint link in vtopo")
1000        except IOError:
1001            raise service_error(service_error.internal, "Cannot write dot file")
1002
1003        # Use dot to create a visualization
1004        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1005                '-Gpack=true', dotname], stdout=PIPE, close_fds=True)
1006
1007        # Translate dot to vis format
1008        vis_nodes = [ ]
1009        vis = { 'node': vis_nodes }
1010        for line in dot.stdout:
1011            m = vis_re.match(line)
1012            if m:
1013                vn = m.group(1)
1014                vis_node = {'name': vn, \
1015                        'x': float(m.group(2)),\
1016                        'y' : float(m.group(3)),\
1017                    }
1018                if vn in links.keys() or vn in lans.keys():
1019                    vis_node['type'] = 'lan'
1020                else:
1021                    vis_node['type'] = 'node'
1022                vis_nodes.append(vis_node)
1023        rv = dot.wait()
1024
1025        os.remove(dotname)
1026        if rv == 0 : return vis
1027        else: return None
1028
1029    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1030            access_user):
1031        """
1032        Get access to testbed through fedd and set the parameters for that tb
1033        """
1034        uri = self.tbmap.get(tb, None)
1035        if not uri:
1036            raise service_error(serice_error.server_config, 
1037                    "Unknown testbed: %s" % tb)
1038
1039        # currently this lumps all users into one service access group
1040        service_keys = [ a for u in user \
1041                for a in u.get('access', []) \
1042                    if a.has_key('sshPubkey')]
1043
1044        if len(service_keys) == 0:
1045            raise service_error(service_error.req, 
1046                    "Must have at least one SSH pubkey for services")
1047
1048
1049        for p, u in access_user:
1050            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1051                    "to %s") %  ((p or "None"), u, uri))
1052
1053            if p:
1054                # Request with user and project specified
1055                req = {\
1056                        'destinationTestbed' : { 'uri' : uri },
1057                        'project': { 
1058                            'name': {'localname': p},
1059                            'user': [ {'userID': { 'localname': u } } ],
1060                            },
1061                        'user':  user,
1062                        'allocID' : { 'localname': 'test' },
1063                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1064                        'serviceAccess' : service_keys
1065                    }
1066            else:
1067                # Request with only user specified
1068                req = {\
1069                        'destinationTestbed' : { 'uri' : uri },
1070                        'user':  [ {'userID': { 'localname': u } } ],
1071                        'allocID' : { 'localname': 'test' },
1072                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1073                        'serviceAccess' : service_keys
1074                    }
1075
1076            if tb == master:
1077                # NB, the export_project parameter is a dict that includes
1078                # the type
1079                req['exportProject'] = export_project
1080
1081            # node resources if any
1082            if nodes != None and len(nodes) > 0:
1083                rnodes = [ ]
1084                for n in nodes:
1085                    rn = { }
1086                    image, hw, count = n.split(":")
1087                    if image: rn['image'] = [ image ]
1088                    if hw: rn['hardware'] = [ hw ]
1089                    if count and int(count) >0 : rn['count'] = int(count)
1090                    rnodes.append(rn)
1091                req['resources']= { }
1092                req['resources']['node'] = rnodes
1093
1094            try:
1095                if self.local_access.has_key(uri):
1096                    # Local access call
1097                    req = { 'RequestAccessRequestBody' : req }
1098                    r = self.local_access[uri].RequestAccess(req, 
1099                            fedid(file=self.cert_file))
1100                    r = { 'RequestAccessResponseBody' : r }
1101                else:
1102                    r = self.call_RequestAccess(uri, req, 
1103                            self.cert_file, self.cert_pwd, self.trusted_certs)
1104            except service_error, e:
1105                if e.code == service_error.access:
1106                    self.log.debug("[get_access] Access denied")
1107                    r = None
1108                    continue
1109                else:
1110                    raise e
1111
1112            if r.has_key('RequestAccessResponseBody'):
1113                # Through to here we have a valid response, not a fault.
1114                # Access denied is a fault, so something better or worse than
1115                # access denied has happened.
1116                r = r['RequestAccessResponseBody']
1117                self.log.debug("[get_access] Access granted")
1118                break
1119            else:
1120                raise service_error(service_error.protocol,
1121                        "Bad proxy response")
1122       
1123        if not r:
1124            raise service_error(service_error.access, 
1125                    "Access denied by %s (%s)" % (tb, uri))
1126
1127        e = r['emulab']
1128        p = e['project']
1129        tbparam[tb] = { 
1130                "boss": e['boss'],
1131                "host": e['ops'],
1132                "domain": e['domain'],
1133                "fs": e['fileServer'],
1134                "eventserver": e['eventServer'],
1135                "project": unpack_id(p['name']),
1136                "emulab" : e,
1137                "allocID" : r['allocID'],
1138                }
1139        # Make the testbed name be the label the user applied
1140        p['testbed'] = {'localname': tb }
1141
1142        for u in p['user']:
1143            role = u.get('role', None)
1144            if role == 'experimentCreation':
1145                tbparam[tb]['user'] = unpack_id(u['userID'])
1146                break
1147        else:
1148            raise service_error(service_error.internal, 
1149                    "No createExperimentUser from %s" %tb)
1150
1151        # Add attributes to barameter space.  We don't allow attributes to
1152        # overlay any parameters already installed.
1153        for a in e['fedAttr']:
1154            try:
1155                if a['attribute'] and isinstance(a['attribute'], basestring)\
1156                        and not tbparam[tb].has_key(a['attribute'].lower()):
1157                    tbparam[tb][a['attribute'].lower()] = a['value']
1158            except KeyError:
1159                self.log.error("Bad attribute in response: %s" % a)
1160       
1161    def release_access(self, tb, aid):
1162        """
1163        Release access to testbed through fedd
1164        """
1165
1166        uri = self.tbmap.get(tb, None)
1167        if not uri:
1168            raise service_error(serice_error.server_config, 
1169                    "Unknown testbed: %s" % tb)
1170
1171        if self.local_access.has_key(uri):
1172            resp = self.local_access[uri].ReleaseAccess(\
1173                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1174                    fedid(file=self.cert_file))
1175            resp = { 'ReleaseAccessResponseBody': resp } 
1176        else:
1177            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1178                    self.cert_file, self.cert_pwd, self.trusted_certs)
1179
1180        # better error coding
1181
1182    def remote_splitter(self, uri, desc, master):
1183
1184        req = {
1185                'description' : { 'ns2description': desc },
1186                'master': master,
1187                'include_fedkit': bool(self.fedkit),
1188                'include_gatewaykit': bool(self.gatewaykit)
1189            }
1190
1191        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1192                self.trusted_certs)
1193
1194        if r.has_key('Ns2SplitResponseBody'):
1195            r = r['Ns2SplitResponseBody']
1196            if r.has_key('output'):
1197                return r['output'].splitlines()
1198            else:
1199                raise service_error(service_error.protocol, 
1200                        "Bad splitter response (no output)")
1201        else:
1202            raise service_error(service_error.protocol, "Bad splitter response")
1203       
1204    class current_testbed:
1205        """
1206        Object for collecting the current testbed description.  The testbed
1207        description is saved to a file with the local testbed variables
1208        subsittuted line by line.
1209        """
1210        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1211            def tar_list_to_string(tl):
1212                if tl is None: return None
1213
1214                rv = ""
1215                for t in tl:
1216                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1217                            (t[0], os.path.basename(t[1]))
1218                return rv
1219
1220
1221            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1222            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1223            self.current_testbed = None
1224            self.testbed_file = None
1225
1226            self.def_expstart = \
1227                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1228            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1229            self.def_gwstart = \
1230                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1231            self.def_mgwstart = \
1232                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1233            self.def_gwimage = "FBSD61-TUNNEL2";
1234            self.def_gwtype = "pc";
1235            self.def_mgwcmd = '# '
1236            self.def_mgwcmdparams = ''
1237            self.def_gwcmd = '# '
1238            self.def_gwcmdparams = ''
1239
1240            self.eid = eid
1241            self.tmpdir = tmpdir
1242            # Convert fedkit and gateway kit (which are lists of tuples) into a
1243            # substituition string.
1244            self.fedkit = tar_list_to_string(fedkit)
1245            self.gatewaykit = tar_list_to_string(gatewaykit)
1246
1247        def __call__(self, line, master, allocated, tbparams):
1248            # Capture testbed topology descriptions
1249            if self.current_testbed == None:
1250                m = self.begin_testbed.match(line)
1251                if m != None:
1252                    self.current_testbed = m.group(1)
1253                    if self.current_testbed == None:
1254                        raise service_error(service_error.req,
1255                                "Bad request format (unnamed testbed)")
1256                    allocated[self.current_testbed] = \
1257                            allocated.get(self.current_testbed,0) + 1
1258                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1259                    if not os.path.exists(tb_dir):
1260                        try:
1261                            os.mkdir(tb_dir)
1262                        except IOError:
1263                            raise service_error(service_error.internal,
1264                                    "Cannot create %s" % tb_dir)
1265                    try:
1266                        self.testbed_file = open("%s/%s.%s.tcl" %
1267                                (tb_dir, self.eid, self.current_testbed), 'w')
1268                    except IOError:
1269                        self.testbed_file = None
1270                    return True
1271                else: return False
1272            else:
1273                m = self.end_testbed.match(line)
1274                if m != None:
1275                    if m.group(1) != self.current_testbed:
1276                        raise service_error(service_error.internal, 
1277                                "Mismatched testbed markers!?")
1278                    if self.testbed_file != None: 
1279                        self.testbed_file.close()
1280                        self.testbed_file = None
1281                    self.current_testbed = None
1282                elif self.testbed_file:
1283                    # Substitute variables and put the line into the local
1284                    # testbed file.
1285                    gwtype = tbparams[self.current_testbed].get(\
1286                            'connectortype', self.def_gwtype)
1287                    gwimage = tbparams[self.current_testbed].get(\
1288                            'connectorimage', self.def_gwimage)
1289                    mgwstart = tbparams[self.current_testbed].get(\
1290                            'masterconnectorstartcmd', self.def_mgwstart)
1291                    mexpstart = tbparams[self.current_testbed].get(\
1292                            'masternodestartcmd', self.def_mexpstart)
1293                    gwstart = tbparams[self.current_testbed].get(\
1294                            'slaveconnectorstartcmd', self.def_gwstart)
1295                    expstart = tbparams[self.current_testbed].get(\
1296                            'slavenodestartcmd', self.def_expstart)
1297                    project = tbparams[self.current_testbed].get('project')
1298                    gwcmd = tbparams[self.current_testbed].get(\
1299                            'slaveconnectorcmd', self.def_gwcmd)
1300                    gwcmdparams = tbparams[self.current_testbed].get(\
1301                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1302                    mgwcmd = tbparams[self.current_testbed].get(\
1303                            'masterconnectorcmd', self.def_gwcmd)
1304                    mgwcmdparams = tbparams[self.current_testbed].get(\
1305                            'masterconnectorcmdparams', self.def_gwcmdparams)
1306                    line = re.sub("GWTYPE", gwtype, line)
1307                    line = re.sub("GWIMAGE", gwimage, line)
1308                    if self.current_testbed == master:
1309                        line = re.sub("GWSTART", mgwstart, line)
1310                        line = re.sub("EXPSTART", mexpstart, line)
1311                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1312                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1313                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1314                    else:
1315                        line = re.sub("GWSTART", gwstart, line)
1316                        line = re.sub("EXPSTART", expstart, line)
1317                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1318                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1319                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1320                    #These expansions contain EID and PROJDIR.  NB these are
1321                    # local fedkit and gatewaykit, which are strings.
1322                    if self.fedkit:
1323                        line = re.sub("FEDKIT", self.fedkit, line)
1324                    if self.gatewaykit:
1325                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1326                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1327                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1328                    line = re.sub("EID", self.eid, line)
1329                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1330                            (project, self.eid), line)
1331                    print >>self.testbed_file, line
1332                return True
1333
1334    class allbeds:
1335        """
1336        Process the Allbeds section.  Get access to each federant and save the
1337        parameters in tbparams
1338        """
1339        def __init__(self, get_access):
1340            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1341            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1342            self.in_allbeds = False
1343            self.get_access = get_access
1344
1345        def __call__(self, line, user, tbparams, master, export_project,
1346                access_user):
1347            # Testbed access parameters
1348            if not self.in_allbeds:
1349                if self.begin_allbeds.match(line):
1350                    self.in_allbeds = True
1351                    return True
1352                else:
1353                    return False
1354            else:
1355                if self.end_allbeds.match(line):
1356                    self.in_allbeds = False
1357                else:
1358                    nodes = line.split('|')
1359                    tb = nodes.pop(0)
1360                    self.get_access(tb, nodes, user, tbparams, master,
1361                            export_project, access_user)
1362                return True
1363
1364    class gateways:
1365        def __init__(self, eid, master, tmpdir, gw_pubkey,
1366                gw_secretkey, copy_file, fedkit):
1367            self.begin_gateways = \
1368                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1369            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1370            self.current_gateways = None
1371            self.control_gateway = None
1372            self.active_end = { }
1373
1374            self.eid = eid
1375            self.master = master
1376            self.tmpdir = tmpdir
1377            self.gw_pubkey_base = gw_pubkey
1378            self.gw_secretkey_base = gw_secretkey
1379
1380            self.copy_file = copy_file
1381            self.fedkit = fedkit
1382
1383
1384        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1385                active_end, tbparams, dtb, myname, desthost, type):
1386            """
1387            Produce a gateway configuration file from a gateways line.
1388            """
1389
1390            sproject = tbparams[gw].get('project', 'project')
1391            dproject = tbparams[dtb].get('project', 'project')
1392            sdomain = ".%s.%s%s" % (eid, sproject,
1393                    tbparams[gw].get('domain', ".example.com"))
1394            ddomain = ".%s.%s%s" % (eid, dproject,
1395                    tbparams[dtb].get('domain', ".example.com"))
1396            boss = tbparams[master].get('boss', "boss")
1397            fs = tbparams[master].get('fs', "fs")
1398            event_server = "%s%s" % \
1399                    (tbparams[gw].get('eventserver', "event_server"),
1400                            tbparams[gw].get('domain', "example.com"))
1401            remote_event_server = "%s%s" % \
1402                    (tbparams[dtb].get('eventserver', "event_server"),
1403                            tbparams[dtb].get('domain', "example.com"))
1404            seer_control = "%s%s" % \
1405                    (tbparams[gw].get('control', "control"), sdomain)
1406            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1407
1408            if self.fedkit:
1409                remote_script_dir = "/usr/local/federation/bin"
1410                local_script_dir = "/usr/local/federation/bin"
1411            else:
1412                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1413                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1414
1415            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1416            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1417            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1418
1419            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1420            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1421
1422            # translate to lower case so the `hostname` hack for specifying
1423            # configuration files works.
1424            conf_file = conf_file.lower();
1425            remote_conf_file = remote_conf_file.lower();
1426
1427            if dtb == master:
1428                active = "false"
1429            elif gw == master:
1430                active = "true"
1431            elif active_end.has_key('%s-%s' % (dtb, gw)):
1432                active = "false"
1433            else:
1434                active_end['%s-%s' % (gw, dtb)] = 1
1435                active = "true"
1436
1437            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1438            print >>gwconfig, "Active: %s" % active
1439            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1440            if tunnel_iface:
1441                print >>gwconfig, "Interface: %s" % tunnel_iface
1442            print >>gwconfig, "BossName: %s" % boss
1443            print >>gwconfig, "FsName: %s" % fs
1444            print >>gwconfig, "EventServerName: %s" % event_server
1445            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1446            print >>gwconfig, "SeerControl: %s" % seer_control
1447            print >>gwconfig, "Type: %s" % type
1448            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1449            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1450                    local_script_dir
1451            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1452            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1453            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1454                    (remote_conf_dir, remote_conf_file)
1455            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1456            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1457            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1458            gwconfig.close()
1459
1460            return active == "true"
1461
1462        def __call__(self, line, allocated, tbparams):
1463            # Process gateways
1464            if not self.current_gateways:
1465                m = self.begin_gateways.match(line)
1466                if m:
1467                    self.current_gateways = m.group(1)
1468                    if allocated.has_key(self.current_gateways):
1469                        # This test should always succeed
1470                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1471                        if not os.path.exists(tb_dir):
1472                            try:
1473                                os.mkdir(tb_dir)
1474                            except IOError:
1475                                raise service_error(service_error.internal,
1476                                        "Cannot create %s" % tb_dir)
1477                    else:
1478                        # XXX
1479                        self.log.error("[gateways]: Ignoring gateways for " + \
1480                                "unknown testbed %s" % self.current_gateways)
1481                        self.current_gateways = None
1482                    return True
1483                else:
1484                    return False
1485            else:
1486                m = self.end_gateways.match(line)
1487                if m :
1488                    if m.group(1) != self.current_gateways:
1489                        raise service_error(service_error.internal,
1490                                "Mismatched gateway markers!?")
1491                    if self.control_gateway:
1492                        try:
1493                            cc = open("%s/%s/client.conf" %
1494                                    (self.tmpdir, self.current_gateways), 'w')
1495                            print >>cc, "ControlGateway: %s" % \
1496                                    self.control_gateway
1497                            if tbparams[self.master].has_key('smbshare'):
1498                                print >>cc, "SMBSHare: %s" % \
1499                                        tbparams[self.master]['smbshare']
1500                            print >>cc, "ProjectUser: %s" % \
1501                                    tbparams[self.master]['user']
1502                            print >>cc, "ProjectName: %s" % \
1503                                    tbparams[self.master]['project']
1504                            print >>cc, "ExperimentID: %s/%s" % \
1505                                    ( tbparams[self.master]['project'], \
1506                                    self.eid )
1507                            cc.close()
1508                        except IOError:
1509                            raise service_error(service_error.internal,
1510                                    "Error creating client config")
1511                        # XXX: This seer specific file should disappear
1512                        try:
1513                            cc = open("%s/%s/seer.conf" %
1514                                    (self.tmpdir, self.current_gateways),
1515                                    'w')
1516                            if self.current_gateways != self.master:
1517                                print >>cc, "ControlNode: %s" % \
1518                                        self.control_gateway
1519                            print >>cc, "ExperimentID: %s/%s" % \
1520                                    ( tbparams[self.master]['project'], \
1521                                    self.eid )
1522                            cc.close()
1523                        except IOError:
1524                            raise service_error(service_error.internal,
1525                                    "Error creating seer config")
1526                    else:
1527                        debug.error("[gateways]: No control gateway for %s" %\
1528                                    self.current_gateways)
1529                    self.current_gateways = None
1530                else:
1531                    dtb, myname, desthost, type = line.split(" ")
1532
1533                    if type == "control" or type == "both":
1534                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1535                                self.eid, 
1536                                tbparams[self.current_gateways]['project'],
1537                                tbparams[self.current_gateways]['domain'])
1538                    try:
1539                        active = self.gateway_conf_file(self.current_gateways,
1540                                self.master, self.eid, self.gw_pubkey_base,
1541                                self.gw_secretkey_base,
1542                                self.active_end, tbparams, dtb, myname,
1543                                desthost, type)
1544                    except IOError, e:
1545                        raise service_error(service_error.internal,
1546                                "Failed to write config file for %s" % \
1547                                        self.current_gateway)
1548           
1549                    gw_pubkey = "%s/keys/%s" % \
1550                            (self.tmpdir, self.gw_pubkey_base)
1551                    gw_secretkey = "%s/keys/%s" % \
1552                            (self.tmpdir, self.gw_secretkey_base)
1553
1554                    pkfile = "%s/%s/%s" % \
1555                            ( self.tmpdir, self.current_gateways, 
1556                                    self.gw_pubkey_base)
1557                    skfile = "%s/%s/%s" % \
1558                            ( self.tmpdir, self.current_gateways, 
1559                                    self.gw_secretkey_base)
1560
1561                    if not os.path.exists(pkfile):
1562                        try:
1563                            self.copy_file(gw_pubkey, pkfile)
1564                        except IOError:
1565                            service_error(service_error.internal,
1566                                    "Failed to copy pubkey file")
1567
1568                    if active and not os.path.exists(skfile):
1569                        try:
1570                            self.copy_file(gw_secretkey, skfile)
1571                        except IOError:
1572                            service_error(service_error.internal,
1573                                    "Failed to copy secretkey file")
1574                return True
1575
1576    class shunt_to_file:
1577        """
1578        Simple class to write data between two regexps to a file.
1579        """
1580        def __init__(self, begin, end, filename):
1581            """
1582            Begin shunting on a match of begin, stop on end, send data to
1583            filename.
1584            """
1585            self.begin = re.compile(begin)
1586            self.end = re.compile(end)
1587            self.in_shunt = False
1588            self.file = None
1589            self.filename = filename
1590
1591        def __call__(self, line):
1592            """
1593            Call this on each line in the input that may be shunted.
1594            """
1595            if not self.in_shunt:
1596                if self.begin.match(line):
1597                    self.in_shunt = True
1598                    try:
1599                        self.file = open(self.filename, "w")
1600                    except:
1601                        self.file = None
1602                        raise
1603                    return True
1604                else:
1605                    return False
1606            else:
1607                if self.end.match(line):
1608                    if self.file: 
1609                        self.file.close()
1610                        self.file = None
1611                    self.in_shunt = False
1612                else:
1613                    if self.file:
1614                        print >>self.file, line
1615                return True
1616
1617    class shunt_to_list:
1618        """
1619        Same interface as shunt_to_file.  Data collected in self.list, one list
1620        element per line.
1621        """
1622        def __init__(self, begin, end):
1623            self.begin = re.compile(begin)
1624            self.end = re.compile(end)
1625            self.in_shunt = False
1626            self.list = [ ]
1627       
1628        def __call__(self, line):
1629            if not self.in_shunt:
1630                if self.begin.match(line):
1631                    self.in_shunt = True
1632                    return True
1633                else:
1634                    return False
1635            else:
1636                if self.end.match(line):
1637                    self.in_shunt = False
1638                else:
1639                    self.list.append(line)
1640                return True
1641
1642    class shunt_to_string:
1643        """
1644        Same interface as shunt_to_file.  Data collected in self.str, all in
1645        one string.
1646        """
1647        def __init__(self, begin, end):
1648            self.begin = re.compile(begin)
1649            self.end = re.compile(end)
1650            self.in_shunt = False
1651            self.str = ""
1652       
1653        def __call__(self, line):
1654            if not self.in_shunt:
1655                if self.begin.match(line):
1656                    self.in_shunt = True
1657                    return True
1658                else:
1659                    return False
1660            else:
1661                if self.end.match(line):
1662                    self.in_shunt = False
1663                else:
1664                    self.str += line
1665                return True
1666
1667    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1668            tbparams, tmpdir, alloc_log=None):
1669        started = { }           # Testbeds where a sub-experiment started
1670                                # successfully
1671
1672        # XXX
1673        fail_soft = False
1674
1675        log = alloc_log or self.log
1676
1677        thread_pool = self.thread_pool(self.nthreads)
1678        threads = [ ]
1679
1680        for tb in [ k for k in allocated.keys() if k != master]:
1681            # Create and start a thread to start the segment, and save it to
1682            # get the return value later
1683            thread_pool.wait_for_slot()
1684            t  = self.pooled_thread(\
1685                    target=self.start_segment(log=log,
1686                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1687                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1688                    pdata=thread_pool, trace_file=self.trace_file)
1689            threads.append(t)
1690            t.start()
1691
1692        # Wait until all finish
1693        thread_pool.wait_for_all_done()
1694
1695        # If none failed, start the master
1696        failed = [ t.getName() for t in threads if not t.rv ]
1697
1698        if len(failed) == 0:
1699            starter = self.start_segment(log=log, 
1700                    keyfile=self.ssh_privkey_file, debug=self.debug)
1701            if not starter(master, eid, tbparams, tmpdir):
1702                failed.append(master)
1703
1704        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1705        # If one failed clean up, unless fail_soft is set
1706        if failed:
1707            if not fail_soft:
1708                thread_pool.clear()
1709                for tb in succeeded:
1710                    # Create and start a thread to stop the segment
1711                    thread_pool.wait_for_slot()
1712                    t  = self.pooled_thread(\
1713                            target=self.stop_segment(log=log,
1714                                keyfile=self.ssh_privkey_file,
1715                                debug=self.debug), 
1716                            args=(tb, eid, tbparams), name=tb,
1717                            pdata=thread_pool, trace_file=self.trace_file)
1718                    t.start()
1719                # Wait until all finish
1720                thread_pool.wait_for_all_done()
1721
1722                # release the allocations
1723                for tb in tbparams.keys():
1724                    self.release_access(tb, tbparams[tb]['allocID'])
1725                # Remove the placeholder
1726                self.state_lock.acquire()
1727                self.state[eid]['experimentStatus'] = 'failed'
1728                if self.state_filename: self.write_state()
1729                self.state_lock.release()
1730
1731                #raise service_error(service_error.federant,
1732                #    "Swap in failed on %s" % ",".join(failed))
1733                log.error("Swap in failed on %s" % ",".join(failed))
1734                return
1735        else:
1736            log.info("[start_segment]: Experiment %s active" % eid)
1737
1738        log.debug("[start_experiment]: removing %s" % tmpdir)
1739
1740        # Walk up tmpdir, deleting as we go
1741        for path, dirs, files in os.walk(tmpdir, topdown=False):
1742            for f in files:
1743                os.remove(os.path.join(path, f))
1744            for d in dirs:
1745                os.rmdir(os.path.join(path, d))
1746        os.rmdir(tmpdir)
1747
1748        # Insert the experiment into our state and update the disk copy
1749        self.state_lock.acquire()
1750        self.state[expid]['experimentStatus'] = 'active'
1751        self.state[eid] = self.state[expid]
1752        if self.state_filename: self.write_state()
1753        self.state_lock.release()
1754        return
1755
1756    def create_experiment(self, req, fid):
1757        """
1758        The external interface to experiment creation called from the
1759        dispatcher.
1760
1761        Creates a working directory, splits the incoming description using the
1762        splitter script and parses out the avrious subsections using the
1763        lcasses above.  Once each sub-experiment is created, use pooled threads
1764        to instantiate them and start it all up.
1765        """
1766
1767        if not self.auth.check_attribute(fid, 'create'):
1768            raise service_error(service_error.access, "Create access denied")
1769
1770        try:
1771            tmpdir = tempfile.mkdtemp(prefix="split-")
1772        except IOError:
1773            raise service_error(service_error.internal, "Cannot create tmp dir")
1774
1775        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1776        gw_secretkey_base = "fed.%s" % self.ssh_type
1777        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1778        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1779        tclfile = tmpdir + "/experiment.tcl"
1780        tbparams = { }
1781        try:
1782            access_user = self.accessdb[fid]
1783        except KeyError:
1784            raise service_error(service_error.internal,
1785                    "Access map and authorizer out of sync in " + \
1786                            "create_experiment for fedid %s"  % fid)
1787
1788        pid = "dummy"
1789        gid = "dummy"
1790        try:
1791            os.mkdir(tmpdir+"/keys")
1792        except OSError:
1793            raise service_error(service_error.internal,
1794                    "Can't make temporary dir")
1795
1796        req = req.get('CreateRequestBody', None)
1797        if not req:
1798            raise service_error(service_error.req,
1799                    "Bad request format (no CreateRequestBody)")
1800        # The tcl parser needs to read a file so put the content into that file
1801        descr=req.get('experimentdescription', None)
1802        if descr:
1803            file_content=descr.get('ns2description', None)
1804            if file_content:
1805                try:
1806                    f = open(tclfile, 'w')
1807                    f.write(file_content)
1808                    f.close()
1809                except IOError:
1810                    raise service_error(service_error.internal,
1811                            "Cannot write temp experiment description")
1812            else:
1813                raise service_error(service_error.req, 
1814                        "Only ns2descriptions supported")
1815        else:
1816            raise service_error(service_error.req, "No experiment description")
1817
1818        # Generate an ID for the experiment (slice) and a certificate that the
1819        # allocator can use to prove they own it.  We'll ship it back through
1820        # the encrypted connection.
1821        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1822
1823        if req.has_key('experimentID') and \
1824                req['experimentID'].has_key('localname'):
1825            eid = req['experimentID']['localname']
1826            self.state_lock.acquire()
1827            while (self.state.has_key(eid)):
1828                eid += random.choice(string.ascii_letters)
1829            # Initial state
1830            self.state[eid] = {
1831                    'experimentID' : \
1832                            [ { 'localname' : eid }, {'fedid': expid } ],
1833                    'experimentStatus': 'starting',
1834                    'experimentAccess': { 'X509' : expcert },
1835                    'owner': fid,
1836                    'log' : [],
1837                }
1838            self.state[expid] = self.state[eid]
1839            if self.state_filename: self.write_state()
1840            self.state_lock.release()
1841        else:
1842            eid = self.exp_stem
1843            for i in range(0,5):
1844                eid += random.choice(string.ascii_letters)
1845            self.state_lock.acquire()
1846            while (self.state.has_key(eid)):
1847                eid = self.exp_stem
1848                for i in range(0,5):
1849                    eid += random.choice(string.ascii_letters)
1850            # Initial state
1851            self.state[eid] = {
1852                    'experimentID' : \
1853                            [ { 'localname' : eid }, {'fedid': expid } ],
1854                    'experimentStatus': 'starting',
1855                    'experimentAccess': { 'X509' : expcert },
1856                    'owner': fid,
1857                    'log' : [],
1858                }
1859            self.state[expid] = self.state[eid]
1860            if self.state_filename: self.write_state()
1861            self.state_lock.release()
1862
1863        try: 
1864            # This catches exceptions to clear the placeholder if necessary
1865            try:
1866                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1867            except ValueError:
1868                raise service_error(service_error.server_config, 
1869                        "Bad key type (%s)" % self.ssh_type)
1870
1871            user = req.get('user', None)
1872            if user == None:
1873                raise service_error(service_error.req, "No user")
1874
1875            master = req.get('master', None)
1876            if not master:
1877                raise service_error(service_error.req,
1878                        "No master testbed label")
1879            export_project = req.get('exportProject', None)
1880            if not export_project:
1881                raise service_error(service_error.req, "No export project")
1882           
1883            if self.splitter_url:
1884                self.log.debug("Calling remote splitter at %s" % \
1885                        self.splitter_url)
1886                split_data = self.remote_splitter(self.splitter_url,
1887                        file_content, master)
1888            else:
1889                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1890                    str(self.muxmax), '-m', master]
1891
1892                if self.fedkit:
1893                    tclcmd.append('-k')
1894
1895                if self.gatewaykit:
1896                    tclcmd.append('-K')
1897
1898                tclcmd.extend([pid, gid, eid, tclfile])
1899
1900                self.log.debug("running local splitter %s", " ".join(tclcmd))
1901                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True)
1902                split_data = tclparser.stdout
1903
1904            allocated = { }         # Testbeds we can access
1905            # Objects to parse the splitter output (defined above)
1906            parse_current_testbed = self.current_testbed(eid, tmpdir,
1907                    self.fedkit, self.gatewaykit)
1908            parse_allbeds = self.allbeds(self.get_access)
1909            parse_gateways = self.gateways(eid, master, tmpdir,
1910                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1911                    self.fedkit)
1912            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1913                        "^#\s+End\s+Vtopo")
1914            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1915                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1916            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1917                    "^#\s+End\s+tarfiles")
1918            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1919                    "^#\s+End\s+rpms")
1920
1921            # Working on the split data
1922            for line in split_data:
1923                line = line.rstrip()
1924                if parse_current_testbed(line, master, allocated, tbparams):
1925                    continue
1926                elif parse_allbeds(line, user, tbparams, master, export_project,
1927                        access_user):
1928                    continue
1929                elif parse_gateways(line, allocated, tbparams):
1930                    continue
1931                elif parse_vtopo(line):
1932                    continue
1933                elif parse_hostnames(line):
1934                    continue
1935                elif parse_tarfiles(line):
1936                    continue
1937                elif parse_rpms(line):
1938                    continue
1939                else:
1940                    raise service_error(service_error.internal, 
1941                            "Bad tcl parse? %s" % line)
1942            # Virtual topology and visualization
1943            vtopo = self.gentopo(parse_vtopo.str)
1944            if not vtopo:
1945                raise service_error(service_error.internal, 
1946                        "Failed to generate virtual topology")
1947
1948            vis = self.genviz(vtopo)
1949            if not vis:
1950                raise service_error(service_error.internal, 
1951                        "Failed to generate visualization")
1952
1953           
1954            # save federant information
1955            for k in allocated.keys():
1956                tbparams[k]['federant'] = {\
1957                        'name': [ { 'localname' : eid} ],\
1958                        'emulab': tbparams[k]['emulab'],\
1959                        'allocID' : tbparams[k]['allocID'],\
1960                        'master' : k == master,\
1961                    }
1962
1963            self.state_lock.acquire()
1964            self.state[eid]['vtopo'] = vtopo
1965            self.state[eid]['vis'] = vis
1966            self.state[expid]['federant'] = \
1967                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1968                        if tbparams[tb].has_key('federant') ]
1969            if self.state_filename: self.write_state()
1970            self.state_lock.release()
1971
1972            # Copy tarfiles and rpms needed at remote sites into a staging area
1973            try:
1974                if self.fedkit:
1975                    for t in self.fedkit:
1976                        parse_tarfiles.list.append(t[1])
1977                if self.gatewaykit:
1978                    for t in self.gatewaykit:
1979                        parse_tarfiles.list.append(t[1])
1980                for t in parse_tarfiles.list:
1981                    if not os.path.exists("%s/tarfiles" % tmpdir):
1982                        os.mkdir("%s/tarfiles" % tmpdir)
1983                    self.copy_file(t, "%s/tarfiles/%s" % \
1984                            (tmpdir, os.path.basename(t)))
1985                for r in parse_rpms.list:
1986                    if not os.path.exists("%s/rpms" % tmpdir):
1987                        os.mkdir("%s/rpms" % tmpdir)
1988                    self.copy_file(r, "%s/rpms/%s" % \
1989                            (tmpdir, os.path.basename(r)))
1990                # A null experiment file in case we need to create a remote
1991                # experiment from scratch
1992                f = open("%s/null.tcl" % tmpdir, "w")
1993                print >>f, """
1994set ns [new Simulator]
1995source tb_compat.tcl
1996
1997set a [$ns node]
1998
1999$ns rtproto Session
2000$ns run
2001"""
2002                f.close()
2003
2004            except IOError, e:
2005                raise service_error(service_error.internal, 
2006                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2007
2008        except service_error, e:
2009            # If something goes wrong in the parse (usually an access error)
2010            # clear the placeholder state.  From here on out the code delays
2011            # exceptions.  Failing at this point returns a fault to the remote
2012            # caller.
2013            self.state_lock.acquire()
2014            del self.state[eid]
2015            del self.state[expid]
2016            if self.state_filename: self.write_state()
2017            self.state_lock.release()
2018            raise e
2019
2020
2021        # Start the background swapper and return the starting state.  From
2022        # here on out, the state will stick around a while.
2023
2024        # Let users touch the state
2025        self.auth.set_attribute(fid, expid)
2026        self.auth.set_attribute(expid, expid)
2027        # Override fedids can manipulate state as well
2028        for o in self.overrides:
2029            self.auth.set_attribute(o, expid)
2030
2031        # Create a logger that logs to the experiment's state object as well as
2032        # to the main log file.
2033
2034        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2035        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2036        # XXX: there should be a global one of these rather than repeating the
2037        # code.
2038        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2039                    '%d %b %y %H:%M:%S'))
2040        alloc_log.addHandler(h)
2041       
2042
2043
2044
2045
2046        # Start a thread to do the resource allocation
2047        t  = Thread(target=self.allocate_resources,
2048                args=(allocated, master, eid, expid, expcert, tbparams, 
2049                    tmpdir, alloc_log),
2050                name=eid)
2051        t.start()
2052
2053        rv = {
2054                'experimentID': [
2055                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2056                ],
2057                'experimentStatus': 'starting',
2058                'experimentAccess': { 'X509' : expcert }
2059            }
2060
2061        return rv
2062
2063    def check_experiment_access(self, fid, key):
2064        """
2065        Confirm that the fid has access to the experiment.  Though a request
2066        may be made in terms of a local name, the access attribute is always
2067        the experiment's fedid.
2068        """
2069        if not isinstance(key, fedid):
2070            self.state_lock.acquire()
2071            if self.state.has_key(key):
2072                if isinstance(self.state[key], dict):
2073                    try:
2074                        kl = [ f['fedid'] for f in \
2075                                self.state[key]['experimentID']\
2076                                    if f.has_key('fedid') ]
2077                    except KeyError:
2078                        self.state_lock.release()
2079                        raise service_error(service_error.internal, 
2080                                "No fedid for experiment %s when checking " +\
2081                                        "access(!?)" % key)
2082                    if len(kl) == 1:
2083                        key = kl[0]
2084                    else:
2085                        self.state_lock.release()
2086                        raise service_error(service_error.internal, 
2087                                "multiple fedids for experiment %s when " +\
2088                                        "checking access(!?)" % key)
2089                elif isinstance(self.state[key], str):
2090                    self.state_lock.release()
2091                    raise service_error(service_error.internal, 
2092                            ("experiment %s is placeholder.  " +\
2093                                    "Creation in progress or aborted oddly") \
2094                                    % key)
2095                else:
2096                    self.state_lock.release()
2097                    raise service_error(service_error.internal, 
2098                            "Unexpected state for %s" % key)
2099
2100            else:
2101                self.state_lock.release()
2102                raise service_error(service_error.access, "Access Denied")
2103            self.state_lock.release()
2104
2105        if self.auth.check_attribute(fid, key):
2106            return True
2107        else:
2108            raise service_error(service_error.access, "Access Denied")
2109
2110
2111
2112    def get_vtopo(self, req, fid):
2113        """
2114        Return the stored virtual topology for this experiment
2115        """
2116        rv = None
2117        state = None
2118
2119        req = req.get('VtopoRequestBody', None)
2120        if not req:
2121            raise service_error(service_error.req,
2122                    "Bad request format (no VtopoRequestBody)")
2123        exp = req.get('experiment', None)
2124        if exp:
2125            if exp.has_key('fedid'):
2126                key = exp['fedid']
2127                keytype = "fedid"
2128            elif exp.has_key('localname'):
2129                key = exp['localname']
2130                keytype = "localname"
2131            else:
2132                raise service_error(service_error.req, "Unknown lookup type")
2133        else:
2134            raise service_error(service_error.req, "No request?")
2135
2136        self.check_experiment_access(fid, key)
2137
2138        self.state_lock.acquire()
2139        if self.state.has_key(key):
2140            if self.state[key].has_key('vtopo'):
2141                rv = { 'experiment' : {keytype: key },\
2142                        'vtopo': self.state[key]['vtopo'],\
2143                    }
2144            else:
2145                state = self.state[key]['experimentStatus']
2146        self.state_lock.release()
2147
2148        if rv: return rv
2149        else: 
2150            if state:
2151                raise service_error(service_error.partial, 
2152                        "Not ready: %s" % state)
2153            else:
2154                raise service_error(service_error.req, "No such experiment")
2155
2156    def get_vis(self, req, fid):
2157        """
2158        Return the stored visualization for this experiment
2159        """
2160        rv = None
2161        state = None
2162
2163        req = req.get('VisRequestBody', None)
2164        if not req:
2165            raise service_error(service_error.req,
2166                    "Bad request format (no VisRequestBody)")
2167        exp = req.get('experiment', None)
2168        if exp:
2169            if exp.has_key('fedid'):
2170                key = exp['fedid']
2171                keytype = "fedid"
2172            elif exp.has_key('localname'):
2173                key = exp['localname']
2174                keytype = "localname"
2175            else:
2176                raise service_error(service_error.req, "Unknown lookup type")
2177        else:
2178            raise service_error(service_error.req, "No request?")
2179
2180        self.check_experiment_access(fid, key)
2181
2182        self.state_lock.acquire()
2183        if self.state.has_key(key):
2184            if self.state[key].has_key('vis'):
2185                rv =  { 'experiment' : {keytype: key },\
2186                        'vis': self.state[key]['vis'],\
2187                        }
2188            else:
2189                state = self.state[key]['experimentStatus']
2190        self.state_lock.release()
2191
2192        if rv: return rv
2193        else:
2194            if state:
2195                raise service_error(service_error.partial, 
2196                        "Not ready: %s" % state)
2197            else:
2198                raise service_error(service_error.req, "No such experiment")
2199
2200    def clean_info_response(self, rv):
2201        """
2202        Remove the information in the experiment's state object that is not in
2203        the info response.
2204        """
2205        # Remove the owner info (should always be there, but...)
2206        if rv.has_key('owner'): del rv['owner']
2207
2208        # Convert the log into the allocationLog parameter and remove the
2209        # log entry (with defensive programming)
2210        if rv.has_key('log'):
2211            rv['allocationLog'] = "".join(rv['log'])
2212            del rv['log']
2213        else:
2214            rv['allocationLog'] = ""
2215
2216        if rv['experimentStatus'] != 'active':
2217            if rv.has_key('federant'): del rv['federant']
2218        else:
2219            # remove the allocationID info from each federant
2220            for f in rv.get('federant', []):
2221                if f.has_key('allocID'): del f['allocID']
2222
2223        return rv
2224
2225    def get_info(self, req, fid):
2226        """
2227        Return all the stored info about this experiment
2228        """
2229        rv = None
2230
2231        req = req.get('InfoRequestBody', None)
2232        if not req:
2233            raise service_error(service_error.req,
2234                    "Bad request format (no InfoRequestBody)")
2235        exp = req.get('experiment', None)
2236        if exp:
2237            if exp.has_key('fedid'):
2238                key = exp['fedid']
2239                keytype = "fedid"
2240            elif exp.has_key('localname'):
2241                key = exp['localname']
2242                keytype = "localname"
2243            else:
2244                raise service_error(service_error.req, "Unknown lookup type")
2245        else:
2246            raise service_error(service_error.req, "No request?")
2247
2248        self.check_experiment_access(fid, key)
2249
2250        # The state may be massaged by the service function that called
2251        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2252        # state.
2253        self.state_lock.acquire()
2254        if self.state.has_key(key):
2255            rv = copy.deepcopy(self.state[key])
2256        self.state_lock.release()
2257
2258        if rv:
2259            return self.clean_info_response(rv)
2260        else:
2261            raise service_error(service_error.req, "No such experiment")
2262
2263    def get_multi_info(self, req, fid):
2264        """
2265        Return all the stored info that this fedid can access
2266        """
2267        rv = { 'info': [ ] }
2268
2269        self.state_lock.acquire()
2270        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2271            self.check_experiment_access(fid, key)
2272
2273            if self.state.has_key(key):
2274                e = copy.deepcopy(self.state[key])
2275                e = self.clean_info_response(e)
2276                rv['info'].append(e)
2277        self.state_lock.release()
2278        return rv
2279
2280
2281    def terminate_experiment(self, req, fid):
2282        """
2283        Swap this experiment out on the federants and delete the shared
2284        information
2285        """
2286        tbparams = { }
2287        req = req.get('TerminateRequestBody', None)
2288        if not req:
2289            raise service_error(service_error.req,
2290                    "Bad request format (no TerminateRequestBody)")
2291        force = req.get('force', False)
2292        exp = req.get('experiment', None)
2293        if exp:
2294            if exp.has_key('fedid'):
2295                key = exp['fedid']
2296                keytype = "fedid"
2297            elif exp.has_key('localname'):
2298                key = exp['localname']
2299                keytype = "localname"
2300            else:
2301                raise service_error(service_error.req, "Unknown lookup type")
2302        else:
2303            raise service_error(service_error.req, "No request?")
2304
2305        self.check_experiment_access(fid, key)
2306
2307        self.state_lock.acquire()
2308        fed_exp = self.state.get(key, None)
2309
2310        if fed_exp:
2311            # This branch of the conditional holds the lock to generate a
2312            # consistent temporary tbparams variable to deallocate experiments.
2313            # It releases the lock to do the deallocations and reacquires it to
2314            # remove the experiment state when the termination is complete.
2315
2316            # First make sure that the experiment creation is complete.
2317            status = fed_exp.get('experimentStatus', None)
2318            if status:
2319                if status == 'starting':
2320                    if not force:
2321                        self.state_lock.release()
2322                        raise service_error(service_error.partial, 
2323                                'Experiment still being created')
2324                    else:
2325                        self.log.warning('Experiment in starting state ' + \
2326                                'being terminated by admin.')
2327            else:
2328                # No status??? trouble
2329                self.state_lock.release()
2330                raise service_error(service_error.internal,
2331                        "Experiment has no status!?")
2332
2333            ids = []
2334            #  experimentID is a list of dicts that are self-describing
2335            #  identifiers.  This finds all the fedids and localnames - the
2336            #  keys of self.state - and puts them into ids.
2337            for id in fed_exp.get('experimentID', []):
2338                if id.has_key('fedid'): ids.append(id['fedid'])
2339                if id.has_key('localname'): ids.append(id['localname'])
2340
2341            # Construct enough of the tbparams to make the stop_segment calls
2342            # work
2343            for fed in fed_exp.get('federant', []):
2344                try:
2345                    for e in fed['name']:
2346                        eid = e.get('localname', None)
2347                        if eid: break
2348                    else:
2349                        continue
2350
2351                    p = fed['emulab']['project']
2352
2353                    project = p['name']['localname']
2354                    tb = p['testbed']['localname']
2355                    user = p['user'][0]['userID']['localname']
2356
2357                    domain = fed['emulab']['domain']
2358                    host  = fed['emulab']['ops']
2359                    aid = fed['allocID']
2360                except KeyError, e:
2361                    continue
2362                tbparams[tb] = {\
2363                        'user': user,\
2364                        'domain': domain,\
2365                        'project': project,\
2366                        'host': host,\
2367                        'eid': eid,\
2368                        'aid': aid,\
2369                    }
2370            self.state_lock.release()
2371
2372            # Stop everyone.
2373            thread_pool = self.thread_pool(self.nthreads)
2374            for tb in tbparams.keys():
2375                # Create and start a thread to stop the segment
2376                thread_pool.wait_for_slot()
2377                t  = self.pooled_thread(\
2378                        target=self.stop_segment(log=self.log,
2379                            keyfile=self.ssh_privkey_file, debug=self.debug), 
2380                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2381                        pdata=thread_pool, trace_file=self.trace_file)
2382                t.start()
2383            # Wait for completions
2384            thread_pool.wait_for_all_done()
2385
2386            # release the allocations (failed experiments have done this
2387            # already, and starting experiments may be in odd states, so we
2388            # ignore errors releasing those allocations
2389            try: 
2390                for tb in tbparams.keys():
2391                    self.release_access(tb, tbparams[tb]['aid'])
2392            except service_error, e:
2393                if status != 'failed' and not force:
2394                    raise e
2395
2396            # Remove the terminated experiment
2397            self.state_lock.acquire()
2398            for id in ids:
2399                if self.state.has_key(id): del self.state[id]
2400
2401            if self.state_filename: self.write_state()
2402            self.state_lock.release()
2403
2404            return { 'experiment': exp }
2405        else:
2406            # Don't forget to release the lock
2407            self.state_lock.release()
2408            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.