source: fedd/federation/experiment_control.py @ 9479343

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 9479343 was 9479343, checked in by Ted Faber <faber@…>, 15 years ago

allow an owner to overwrite a filed experiment without terminating it

  • Property mode set to 100644
File size: 90.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = Lock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
70   
71    class thread_pool:
72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
148
149    class pooled_thread(Thread):
150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
188
189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
192
193    def __init__(self, config=None, auth=None):
194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
214        self.list_log = experiment_control_local.list_log
215
216        self.cert_file = config.get("experiment_control", "cert_file")
217        if self.cert_file:
218            self.cert_pwd = config.get("experiment_control", "cert_pwd")
219        else:
220            self.cert_file = config.get("globals", "cert_file")
221            self.cert_pwd = config.get("globals", "cert_pwd")
222
223        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
224                or config.get("globals", "trusted_certs")
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 2
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'Create': soap_handler('Create', self.create_experiment),
337                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
338                'Vis': soap_handler('Vis', self.get_vis),
339                'Info': soap_handler('Info', self.get_info),
340                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
341                'Terminate': soap_handler('Terminate', 
342                    self.terminate_experiment),
343        }
344
345        self.xmlrpc_services = {\
346                'Create': xmlrpc_handler('Create', self.create_experiment),
347                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
348                'Vis': xmlrpc_handler('Vis', self.get_vis),
349                'Info': xmlrpc_handler('Info', self.get_info),
350                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
351                'Terminate': xmlrpc_handler('Terminate',
352                    self.terminate_experiment),
353        }
354
355    def copy_file(self, src, dest, size=1024):
356        """
357        Exceedingly simple file copy.
358        """
359        s = open(src,'r')
360        d = open(dest, 'w')
361
362        buf = "x"
363        while buf != "":
364            buf = s.read(size)
365            d.write(buf)
366        s.close()
367        d.close()
368
369    # Call while holding self.state_lock
370    def write_state(self):
371        """
372        Write a new copy of experiment state after copying the existing state
373        to a backup.
374
375        State format is a simple pickling of the state dictionary.
376        """
377        if os.access(self.state_filename, os.W_OK):
378            self.copy_file(self.state_filename, \
379                    "%s.bak" % self.state_filename)
380        try:
381            f = open(self.state_filename, 'w')
382            pickle.dump(self.state, f)
383        except IOError, e:
384            self.log.error("Can't write file %s: %s" % \
385                    (self.state_filename, e))
386        except pickle.PicklingError, e:
387            self.log.error("Pickling problem: %s" % e)
388        except TypeError, e:
389            self.log.error("Pickling problem (TypeError): %s" % e)
390
391    # Call while holding self.state_lock
392    def read_state(self):
393        """
394        Read a new copy of experiment state.  Old state is overwritten.
395
396        State format is a simple pickling of the state dictionary.
397        """
398        try:
399            f = open(self.state_filename, "r")
400            self.state = pickle.load(f)
401            self.log.debug("[read_state]: Read state from %s" % \
402                    self.state_filename)
403        except IOError, e:
404            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
405                    % (self.state_filename, e))
406        except pickle.UnpicklingError, e:
407            self.log.warning(("[read_state]: No saved state: " + \
408                    "Unpickling failed: %s") % e)
409       
410        for k in self.state.keys():
411            try:
412                # This list should only have one element in it, but phrasing it
413                # as a for loop doesn't cost much, really.  We have to find the
414                # fedid elements anyway.
415                for eid in [ f['fedid'] \
416                        for f in self.state[k]['experimentID']\
417                            if f.has_key('fedid') ]:
418                    self.auth.set_attribute(self.state[k]['owner'], eid)
419                    # allow overrides to control experiments as well
420                    for o in self.overrides:
421                        self.auth.set_attribute(o, eid)
422            except KeyError, e:
423                self.log.warning("[read_state]: State ownership or identity " +\
424                        "misformatted in %s: %s" % (self.state_filename, e))
425
426
427    def read_accessdb(self, accessdb_file):
428        """
429        Read the mapping from fedids that can create experiments to their name
430        in the 3-level access namespace.  All will be asserted from this
431        testbed and can include the local username and porject that will be
432        asserted on their behalf by this fedd.  Each fedid is also added to the
433        authorization system with the "create" attribute.
434        """
435        self.accessdb = {}
436        # These are the regexps for parsing the db
437        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
438        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
439                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
440        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
441                "\s*->\s*(" + name_expr + ")\s*$")
442        lineno = 0
443
444        # Parse the mappings and store in self.authdb, a dict of
445        # fedid -> (proj, user)
446        try:
447            f = open(accessdb_file, "r")
448            for line in f:
449                lineno += 1
450                line = line.strip()
451                if len(line) == 0 or line.startswith('#'):
452                    continue
453                m = project_line.match(line)
454                if m:
455                    fid = fedid(hexstr=m.group(1))
456                    project, user = m.group(2,3)
457                    if not self.accessdb.has_key(fid):
458                        self.accessdb[fid] = []
459                    self.accessdb[fid].append((project, user))
460                    continue
461
462                m = user_line.match(line)
463                if m:
464                    fid = fedid(hexstr=m.group(1))
465                    project = None
466                    user = m.group(2)
467                    if not self.accessdb.has_key(fid):
468                        self.accessdb[fid] = []
469                    self.accessdb[fid].append((project, user))
470                    continue
471                self.log.warn("[experiment_control] Error parsing access " +\
472                        "db %s at line %d" %  (accessdb_file, lineno))
473        except IOError:
474            raise service_error(service_error.internal,
475                    "Error opening/reading %s as experiment " +\
476                            "control accessdb" %  accessdb_file)
477        f.close()
478
479        # Initialize the authorization attributes
480        for fid in self.accessdb.keys():
481            self.auth.set_attribute(fid, 'create')
482
483    def read_mapdb(self, file):
484        """
485        Read a simple colon separated list of mappings for the
486        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
487        """
488
489        self.tbmap = { }
490        lineno =0
491        try:
492            f = open(file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if line.startswith('#') or len(line) == 0:
497                    continue
498                try:
499                    label, url = line.split(':', 1)
500                    self.tbmap[label] = url
501                except ValueError, e:
502                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
503                            "map db: %s %s" % (lineno, line, e))
504        except IOError, e:
505            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
506                    "open %s: %s" % (file, e))
507        f.close()
508
509    class emulab_segment:
510        def __init__(self, log=None, keyfile=None, debug=False):
511            self.log = log or logging.getLogger(\
512                    'fedd.experiment_control.emulab_segment')
513            self.ssh_privkey_file = keyfile
514            self.debug = debug
515            self.ssh_exec="/usr/bin/ssh"
516            self.scp_exec = "/usr/bin/scp"
517            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
518
519        def scp_file(self, file, user, host, dest=""):
520            """
521            scp a file to the remote host.  If debug is set the action is only
522            logged.
523            """
524
525            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
526                    '-o', 'StrictHostKeyChecking yes', '-i', 
527                    self.ssh_privkey_file, file, 
528                    "%s@%s:%s" % (user, host, dest)]
529            rv = 0
530
531            try:
532                dnull = open("/dev/null", "w")
533            except IOError:
534                self.log.debug("[ssh_file]: failed to open " + \
535                        "/dev/null for redirect")
536                dnull = Null
537
538            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
539            if not self.debug:
540                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
541                        close_fds=True)
542
543            return rv == 0
544
545        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
546            """
547            Run a remote command on host as user.  If debug is set, the action
548            is only logged.  Commands are run without stdin, to avoid stray
549            SIGTTINs.
550            """
551            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
552                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
553                    (self.ssh_exec, self.ssh_privkey_file, 
554                            user, host, cmd)
555
556            try:
557                dnull = open("/dev/null", "r")
558            except IOError:
559                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
560                        "for redirect")
561                dnull = Null
562
563            self.log.debug("[ssh_cmd]: %s" % sh_str)
564            if not self.debug:
565                if dnull:
566                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
567                            close_fds=True)
568                else:
569                    sub = Popen(sh_str, shell=True,
570                            close_fds=True)
571                if timeout:
572                    i = 0
573                    rv = sub.poll()
574                    while i < timeout:
575                        if rv is not None: break
576                        else:
577                            time.sleep(1)
578                            rv = sub.poll()
579                            i += 1
580                    else:
581                        self.log.debug("Process exceeded runtime: %s" % sh_str)
582                        os.kill(sub.pid, signal.SIGKILL)
583                        raise self.ssh_cmd_timeout();
584                    return rv == 0
585                else:
586                    return sub.wait() == 0
587            else:
588                if timeout == 0:
589                    self.log.debug("debug timeout raised on %s " % sh_str)
590                    raise self.ssh_cmd_timeout()
591                else:
592                    return True
593
594    class start_segment(emulab_segment):
595        def __init__(self, log=None, keyfile=None, debug=False):
596            experiment_control_local.emulab_segment.__init__(self,
597                    log=log, keyfile=keyfile, debug=debug)
598
599        def create_config_tree(self, src_dir, dest_dir, script):
600            """
601            Append commands to script that will create the directory hierarchy
602            on the remote federant.
603            """
604
605            if os.path.isdir(src_dir):
606                print >>script, "mkdir -p %s" % dest_dir
607                print >>script, "chmod 770 %s" % dest_dir
608
609                for f in os.listdir(src_dir):
610                    if os.path.isdir(f):
611                        self.create_config_tree("%s/%s" % (src_dir, f), 
612                                "%s/%s" % (dest_dir, f), script)
613            else:
614                self.log.debug("[create_config_tree]: Not a directory: %s" \
615                        % src_dir)
616
617        def ship_configs(self, host, user, src_dir, dest_dir):
618            """
619            Copy federant-specific configuration files to the federant.
620            """
621            for f in os.listdir(src_dir):
622                if os.path.isdir(f):
623                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
624                            "%s/%s" % (dest_dir, f)):
625                        return False
626                else:
627                    if not self.scp_file("%s/%s" % (src_dir, f), 
628                            user, host, dest_dir):
629                        return False
630            return True
631
632        def get_state(self, user, host, tb, pid, eid):
633            # command to test experiment state
634            expinfo_exec = "/usr/testbed/bin/expinfo" 
635            # Regular expressions to parse the expinfo response
636            state_re = re.compile("State:\s+(\w+)")
637            no_exp_re = re.compile("^No\s+such\s+experiment")
638            swapping_re = re.compile("^No\s+information\s+available.")
639            state = None    # Experiment state parsed from expinfo
640            # The expinfo ssh command.  Note the identity restriction to use
641            # only the identity provided in the pubkey given.
642            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
643                    'StrictHostKeyChecking yes', '-i', 
644                    self.ssh_privkey_file, "%s@%s" % (user, host), 
645                    expinfo_exec, pid, eid]
646
647            dev_null = None
648            try:
649                dev_null = open("/dev/null", "a")
650            except IOError, e:
651                self.log.error("[get_state]: can't open /dev/null: %s" %e)
652
653            if self.debug:
654                state = 'swapped'
655                rv = 0
656            else:
657                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
658                        close_fds=True)
659                for line in status.stdout:
660                    m = state_re.match(line)
661                    if m: state = m.group(1)
662                    else:
663                        for reg, st in ((no_exp_re, "none"),
664                                (swapping_re, "swapping")):
665                            m = reg.match(line)
666                            if m: state = st
667                rv = status.wait()
668
669            # If the experiment is not present the subcommand returns a
670            # non-zero return value.  If we successfully parsed a "none"
671            # outcome, ignore the return code.
672            if rv != 0 and state != 'none':
673                raise service_error(service_error.internal,
674                        "Cannot get status of segment %s:%s/%s" % \
675                                (tb, pid, eid))
676            elif state not in ('active', 'swapped', 'swapping', 'none'):
677                raise service_error(service_error.internal,
678                        "Cannot get status of segment %s:%s/%s" % \
679                                (tb, pid, eid))
680            else: return state
681
682
683        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
684            """
685            Start a sub-experiment on a federant.
686
687            Get the current state, modify or create as appropriate, ship data
688            and configs and start the experiment.  There are small ordering
689            differences based on the initial state of the sub-experiment.
690            """
691            # ops node in the federant
692            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
693            user = tbparams[tb]['user']     # federant user
694            pid = tbparams[tb]['project']   # federant project
695            # XXX
696            base_confs = ( "hosts",)
697            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
698            # Configuration directories on the remote machine
699            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
700            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
701            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
702
703            state = self.get_state(user, host, tb, pid, eid)
704
705            self.log.debug("[start_segment]: %s: %s" % (tb, state))
706            self.log.info("[start_segment]:transferring experiment to %s" % tb)
707
708            if not self.scp_file("%s/%s/%s" % \
709                    (tmpdir, tb, tclfile), user, host):
710                return False
711           
712            if state == 'none':
713                # Create a null copy of the experiment so that we capture any
714                # logs there if the modify fails.  Emulab software discards the
715                # logs from a failed startexp
716                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
717                    return False
718                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
719                timedout = False
720                try:
721                    if not self.ssh_cmd(user, host,
722                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
723                            "-e %s null.tcl") % (pid, eid), "startexp",
724                            timeout=60 * 10):
725                        return False
726                except self.ssh_cmd_timeout:
727                    timedout = True
728
729                if timedout:
730                    state = self.get_state(user, host, tb, pid, eid)
731                    if state != "swapped":
732                        return False
733
734           
735            # Open up a temporary file to contain a script for setting up the
736            # filespace for the new experiment.
737            self.log.info("[start_segment]: creating script file")
738            try:
739                sf, scriptname = tempfile.mkstemp()
740                scriptfile = os.fdopen(sf, 'w')
741            except IOError:
742                return False
743
744            scriptbase = os.path.basename(scriptname)
745
746            # Script the filesystem changes
747            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
748            # Clear and create the tarfiles and rpm directories
749            for d in (tarfiles_dir, rpms_dir):
750                print >>scriptfile, "/bin/rm -rf %s/*" % d
751                print >>scriptfile, "mkdir -p %s" % d
752            print >>scriptfile, 'mkdir -p %s' % proj_dir
753            self.create_config_tree("%s/%s" % (tmpdir, tb),
754                    proj_dir, scriptfile)
755            if os.path.isdir("%s/tarfiles" % tmpdir):
756                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
757                        scriptfile)
758            if os.path.isdir("%s/rpms" % tmpdir):
759                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
760                        scriptfile)
761            print >>scriptfile, "rm -f %s" % scriptbase
762            scriptfile.close()
763
764            # Move the script to the remote machine
765            # XXX: could collide tempfile names on the remote host
766            if self.scp_file(scriptname, user, host, scriptbase):
767                os.remove(scriptname)
768            else:
769                return False
770
771            # Execute the script (and the script's last line deletes it)
772            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
773                return False
774
775            for f in base_confs:
776                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
777                        "%s/%s" % (proj_dir, f)):
778                    return False
779            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
780                    proj_dir):
781                return False
782            if os.path.isdir("%s/tarfiles" % tmpdir):
783                if not self.ship_configs(host, user,
784                        "%s/tarfiles" % tmpdir, tarfiles_dir):
785                    return False
786            if os.path.isdir("%s/rpms" % tmpdir):
787                if not self.ship_configs(host, user,
788                        "%s/rpms" % tmpdir, tarfiles_dir):
789                    return False
790            # Stage the new configuration (active experiments will stay swapped
791            # in now)
792            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
793            try:
794                if not self.ssh_cmd(user, host,
795                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
796                                (pid, eid, tclfile),
797                        "modexp", timeout= 60 * 10):
798                    return False
799            except self.ssh_cmd_timeout:
800                self.log.error("Modify command failed to complete in time")
801                # There's really no way to see if this succeeded or failed, so
802                # if it hangs, assume the worst.
803                return False
804            # Active experiments are still swapped, this swaps the others in.
805            if state != 'active':
806                self.log.info("[start_segment]: Swapping %s in on %s" % \
807                        (eid, tb))
808                timedout = False
809                try:
810                    if not self.ssh_cmd(user, host,
811                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
812                            "swapexp", timeout=10*60):
813                        return False
814                except self.ssh_cmd_timeout:
815                    timedout = True
816               
817                # If the command was terminated, but completed successfully,
818                # report success.
819                if timedout:
820                    self.log.debug("[start_segment]: swapin timed out " +\
821                            "checking state")
822                    state = self.get_state(user, host, tb, pid, eid)
823                    self.log.debug("[start_segment]: state is %s" % state)
824                    return state == 'active'
825            # Everything has gone OK.
826            return True
827
828    class stop_segment(emulab_segment):
829        def __init__(self, log=None, keyfile=None, debug=False):
830            experiment_control_local.emulab_segment.__init__(self,
831                    log=log, keyfile=keyfile, debug=debug)
832
833        def __call__(self, tb, eid, tbparams):
834            """
835            Stop a sub experiment by calling swapexp on the federant
836            """
837            user = tbparams[tb]['user']
838            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
839            pid = tbparams[tb]['project']
840
841            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
842            rv = False
843            try:
844                # Clean out tar files: we've gone over quota in the past
845                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
846                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
847                        (pid, eid))
848                rv = self.ssh_cmd(user, host,
849                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
850            except self.ssh_cmd_timeout:
851                rv = False
852            return rv
853
854       
855    def generate_ssh_keys(self, dest, type="rsa" ):
856        """
857        Generate a set of keys for the gateways to use to talk.
858
859        Keys are of type type and are stored in the required dest file.
860        """
861        valid_types = ("rsa", "dsa")
862        t = type.lower();
863        if t not in valid_types: raise ValueError
864        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
865
866        try:
867            trace = open("/dev/null", "w")
868        except IOError:
869            raise service_error(service_error.internal,
870                    "Cannot open /dev/null??");
871
872        # May raise CalledProcessError
873        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
874        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
875        if rv != 0:
876            raise service_error(service_error.internal, 
877                    "Cannot generate nonce ssh keys.  %s return code %d" \
878                            % (self.ssh_keygen, rv))
879
880    def gentopo(self, str):
881        """
882        Generate the topology dtat structure from the splitter's XML
883        representation of it.
884
885        The topology XML looks like:
886            <experiment>
887                <nodes>
888                    <node><vname></vname><ips>ip1:ip2</ips></node>
889                </nodes>
890                <lans>
891                    <lan>
892                        <vname></vname><vnode></vnode><ip></ip>
893                        <bandwidth></bandwidth><member>node:port</member>
894                    </lan>
895                </lans>
896        """
897        class topo_parse:
898            """
899            Parse the topology XML and create the dats structure.
900            """
901            def __init__(self):
902                # Typing of the subelements for data conversion
903                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
904                self.int_subelements = ( 'bandwidth',)
905                self.float_subelements = ( 'delay',)
906                # The final data structure
907                self.nodes = [ ]
908                self.lans =  [ ]
909                self.topo = { \
910                        'node': self.nodes,\
911                        'lan' : self.lans,\
912                    }
913                self.element = { }  # Current element being created
914                self.chars = ""     # Last text seen
915
916            def end_element(self, name):
917                # After each sub element the contents is added to the current
918                # element or to the appropriate list.
919                if name == 'node':
920                    self.nodes.append(self.element)
921                    self.element = { }
922                elif name == 'lan':
923                    self.lans.append(self.element)
924                    self.element = { }
925                elif name in self.str_subelements:
926                    self.element[name] = self.chars
927                    self.chars = ""
928                elif name in self.int_subelements:
929                    self.element[name] = int(self.chars)
930                    self.chars = ""
931                elif name in self.float_subelements:
932                    self.element[name] = float(self.chars)
933                    self.chars = ""
934
935            def found_chars(self, data):
936                self.chars += data.rstrip()
937
938
939        tp = topo_parse();
940        parser = xml.parsers.expat.ParserCreate()
941        parser.EndElementHandler = tp.end_element
942        parser.CharacterDataHandler = tp.found_chars
943
944        parser.Parse(str)
945
946        return tp.topo
947       
948
949    def genviz(self, topo):
950        """
951        Generate the visualization the virtual topology
952        """
953
954        neato = "/usr/local/bin/neato"
955        # These are used to parse neato output and to create the visualization
956        # file.
957        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
958        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
959                "%s</type></node>"
960
961        try:
962            # Node names
963            nodes = [ n['vname'] for n in topo['node'] ]
964            topo_lans = topo['lan']
965        except KeyError:
966            raise service_error(service_error.internal, "Bad topology")
967
968        lans = { }
969        links = { }
970
971        # Walk through the virtual topology, organizing the connections into
972        # 2-node connections (links) and more-than-2-node connections (lans).
973        # When a lan is created, it's added to the list of nodes (there's a
974        # node in the visualization for the lan).
975        for l in topo_lans:
976            if links.has_key(l['vname']):
977                if len(links[l['vname']]) < 2:
978                    links[l['vname']].append(l['vnode'])
979                else:
980                    nodes.append(l['vname'])
981                    lans[l['vname']] = links[l['vname']]
982                    del links[l['vname']]
983                    lans[l['vname']].append(l['vnode'])
984            elif lans.has_key(l['vname']):
985                lans[l['vname']].append(l['vnode'])
986            else:
987                links[l['vname']] = [ l['vnode'] ]
988
989
990        # Open up a temporary file for dot to turn into a visualization
991        try:
992            df, dotname = tempfile.mkstemp()
993            dotfile = os.fdopen(df, 'w')
994        except IOError:
995            raise service_error(service_error.internal,
996                    "Failed to open file in genviz")
997
998        # Generate a dot/neato input file from the links, nodes and lans
999        try:
1000            print >>dotfile, "graph G {"
1001            for n in nodes:
1002                print >>dotfile, '\t"%s"' % n
1003            for l in links.keys():
1004                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
1005            for l in lans.keys():
1006                for n in lans[l]:
1007                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
1008            print >>dotfile, "}"
1009            dotfile.close()
1010        except TypeError:
1011            raise service_error(service_error.internal,
1012                    "Single endpoint link in vtopo")
1013        except IOError:
1014            raise service_error(service_error.internal, "Cannot write dot file")
1015
1016        # Use dot to create a visualization
1017        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1018                '-Gpack=true', dotname], stdout=PIPE, close_fds=True)
1019
1020        # Translate dot to vis format
1021        vis_nodes = [ ]
1022        vis = { 'node': vis_nodes }
1023        for line in dot.stdout:
1024            m = vis_re.match(line)
1025            if m:
1026                vn = m.group(1)
1027                vis_node = {'name': vn, \
1028                        'x': float(m.group(2)),\
1029                        'y' : float(m.group(3)),\
1030                    }
1031                if vn in links.keys() or vn in lans.keys():
1032                    vis_node['type'] = 'lan'
1033                else:
1034                    vis_node['type'] = 'node'
1035                vis_nodes.append(vis_node)
1036        rv = dot.wait()
1037
1038        os.remove(dotname)
1039        if rv == 0 : return vis
1040        else: return None
1041
1042    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1043            access_user):
1044        """
1045        Get access to testbed through fedd and set the parameters for that tb
1046        """
1047        uri = self.tbmap.get(tb, None)
1048        if not uri:
1049            raise service_error(serice_error.server_config, 
1050                    "Unknown testbed: %s" % tb)
1051
1052        # currently this lumps all users into one service access group
1053        service_keys = [ a for u in user \
1054                for a in u.get('access', []) \
1055                    if a.has_key('sshPubkey')]
1056
1057        if len(service_keys) == 0:
1058            raise service_error(service_error.req, 
1059                    "Must have at least one SSH pubkey for services")
1060
1061
1062        for p, u in access_user:
1063            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1064                    "to %s") %  ((p or "None"), u, uri))
1065
1066            if p:
1067                # Request with user and project specified
1068                req = {\
1069                        'destinationTestbed' : { 'uri' : uri },
1070                        'project': { 
1071                            'name': {'localname': p},
1072                            'user': [ {'userID': { 'localname': u } } ],
1073                            },
1074                        'user':  user,
1075                        'allocID' : { 'localname': 'test' },
1076                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1077                        'serviceAccess' : service_keys
1078                    }
1079            else:
1080                # Request with only user specified
1081                req = {\
1082                        'destinationTestbed' : { 'uri' : uri },
1083                        'user':  [ {'userID': { 'localname': u } } ],
1084                        'allocID' : { 'localname': 'test' },
1085                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1086                        'serviceAccess' : service_keys
1087                    }
1088
1089            if tb == master:
1090                # NB, the export_project parameter is a dict that includes
1091                # the type
1092                req['exportProject'] = export_project
1093
1094            # node resources if any
1095            if nodes != None and len(nodes) > 0:
1096                rnodes = [ ]
1097                for n in nodes:
1098                    rn = { }
1099                    image, hw, count = n.split(":")
1100                    if image: rn['image'] = [ image ]
1101                    if hw: rn['hardware'] = [ hw ]
1102                    if count and int(count) >0 : rn['count'] = int(count)
1103                    rnodes.append(rn)
1104                req['resources']= { }
1105                req['resources']['node'] = rnodes
1106
1107            try:
1108                if self.local_access.has_key(uri):
1109                    # Local access call
1110                    req = { 'RequestAccessRequestBody' : req }
1111                    r = self.local_access[uri].RequestAccess(req, 
1112                            fedid(file=self.cert_file))
1113                    r = { 'RequestAccessResponseBody' : r }
1114                else:
1115                    r = self.call_RequestAccess(uri, req, 
1116                            self.cert_file, self.cert_pwd, self.trusted_certs)
1117            except service_error, e:
1118                if e.code == service_error.access:
1119                    self.log.debug("[get_access] Access denied")
1120                    r = None
1121                    continue
1122                else:
1123                    raise e
1124
1125            if r.has_key('RequestAccessResponseBody'):
1126                # Through to here we have a valid response, not a fault.
1127                # Access denied is a fault, so something better or worse than
1128                # access denied has happened.
1129                r = r['RequestAccessResponseBody']
1130                self.log.debug("[get_access] Access granted")
1131                break
1132            else:
1133                raise service_error(service_error.protocol,
1134                        "Bad proxy response")
1135       
1136        if not r:
1137            raise service_error(service_error.access, 
1138                    "Access denied by %s (%s)" % (tb, uri))
1139
1140        e = r['emulab']
1141        p = e['project']
1142        tbparam[tb] = { 
1143                "boss": e['boss'],
1144                "host": e['ops'],
1145                "domain": e['domain'],
1146                "fs": e['fileServer'],
1147                "eventserver": e['eventServer'],
1148                "project": unpack_id(p['name']),
1149                "emulab" : e,
1150                "allocID" : r['allocID'],
1151                }
1152        # Make the testbed name be the label the user applied
1153        p['testbed'] = {'localname': tb }
1154
1155        for u in p['user']:
1156            role = u.get('role', None)
1157            if role == 'experimentCreation':
1158                tbparam[tb]['user'] = unpack_id(u['userID'])
1159                break
1160        else:
1161            raise service_error(service_error.internal, 
1162                    "No createExperimentUser from %s" %tb)
1163
1164        # Add attributes to barameter space.  We don't allow attributes to
1165        # overlay any parameters already installed.
1166        for a in e['fedAttr']:
1167            try:
1168                if a['attribute'] and isinstance(a['attribute'], basestring)\
1169                        and not tbparam[tb].has_key(a['attribute'].lower()):
1170                    tbparam[tb][a['attribute'].lower()] = a['value']
1171            except KeyError:
1172                self.log.error("Bad attribute in response: %s" % a)
1173       
1174    def release_access(self, tb, aid):
1175        """
1176        Release access to testbed through fedd
1177        """
1178
1179        uri = self.tbmap.get(tb, None)
1180        if not uri:
1181            raise service_error(serice_error.server_config, 
1182                    "Unknown testbed: %s" % tb)
1183
1184        if self.local_access.has_key(uri):
1185            resp = self.local_access[uri].ReleaseAccess(\
1186                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1187                    fedid(file=self.cert_file))
1188            resp = { 'ReleaseAccessResponseBody': resp } 
1189        else:
1190            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1191                    self.cert_file, self.cert_pwd, self.trusted_certs)
1192
1193        # better error coding
1194
1195    def remote_splitter(self, uri, desc, master):
1196
1197        req = {
1198                'description' : { 'ns2description': desc },
1199                'master': master,
1200                'include_fedkit': bool(self.fedkit),
1201                'include_gatewaykit': bool(self.gatewaykit)
1202            }
1203
1204        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1205                self.trusted_certs)
1206
1207        if r.has_key('Ns2SplitResponseBody'):
1208            r = r['Ns2SplitResponseBody']
1209            if r.has_key('output'):
1210                return r['output'].splitlines()
1211            else:
1212                raise service_error(service_error.protocol, 
1213                        "Bad splitter response (no output)")
1214        else:
1215            raise service_error(service_error.protocol, "Bad splitter response")
1216       
1217    class current_testbed:
1218        """
1219        Object for collecting the current testbed description.  The testbed
1220        description is saved to a file with the local testbed variables
1221        subsittuted line by line.
1222        """
1223        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1224            def tar_list_to_string(tl):
1225                if tl is None: return None
1226
1227                rv = ""
1228                for t in tl:
1229                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1230                            (t[0], os.path.basename(t[1]))
1231                return rv
1232
1233
1234            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1235            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1236            self.current_testbed = None
1237            self.testbed_file = None
1238
1239            self.def_expstart = \
1240                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1241            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1242            self.def_gwstart = \
1243                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1244            self.def_mgwstart = \
1245                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1246            self.def_gwimage = "FBSD61-TUNNEL2";
1247            self.def_gwtype = "pc";
1248            self.def_mgwcmd = '# '
1249            self.def_mgwcmdparams = ''
1250            self.def_gwcmd = '# '
1251            self.def_gwcmdparams = ''
1252
1253            self.eid = eid
1254            self.tmpdir = tmpdir
1255            # Convert fedkit and gateway kit (which are lists of tuples) into a
1256            # substituition string.
1257            self.fedkit = tar_list_to_string(fedkit)
1258            self.gatewaykit = tar_list_to_string(gatewaykit)
1259
1260        def __call__(self, line, master, allocated, tbparams):
1261            # Capture testbed topology descriptions
1262            if self.current_testbed == None:
1263                m = self.begin_testbed.match(line)
1264                if m != None:
1265                    self.current_testbed = m.group(1)
1266                    if self.current_testbed == None:
1267                        raise service_error(service_error.req,
1268                                "Bad request format (unnamed testbed)")
1269                    allocated[self.current_testbed] = \
1270                            allocated.get(self.current_testbed,0) + 1
1271                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1272                    if not os.path.exists(tb_dir):
1273                        try:
1274                            os.mkdir(tb_dir)
1275                        except IOError:
1276                            raise service_error(service_error.internal,
1277                                    "Cannot create %s" % tb_dir)
1278                    try:
1279                        self.testbed_file = open("%s/%s.%s.tcl" %
1280                                (tb_dir, self.eid, self.current_testbed), 'w')
1281                    except IOError:
1282                        self.testbed_file = None
1283                    return True
1284                else: return False
1285            else:
1286                m = self.end_testbed.match(line)
1287                if m != None:
1288                    if m.group(1) != self.current_testbed:
1289                        raise service_error(service_error.internal, 
1290                                "Mismatched testbed markers!?")
1291                    if self.testbed_file != None: 
1292                        self.testbed_file.close()
1293                        self.testbed_file = None
1294                    self.current_testbed = None
1295                elif self.testbed_file:
1296                    # Substitute variables and put the line into the local
1297                    # testbed file.
1298                    gwtype = tbparams[self.current_testbed].get(\
1299                            'connectortype', self.def_gwtype)
1300                    gwimage = tbparams[self.current_testbed].get(\
1301                            'connectorimage', self.def_gwimage)
1302                    mgwstart = tbparams[self.current_testbed].get(\
1303                            'masterconnectorstartcmd', self.def_mgwstart)
1304                    mexpstart = tbparams[self.current_testbed].get(\
1305                            'masternodestartcmd', self.def_mexpstart)
1306                    gwstart = tbparams[self.current_testbed].get(\
1307                            'slaveconnectorstartcmd', self.def_gwstart)
1308                    expstart = tbparams[self.current_testbed].get(\
1309                            'slavenodestartcmd', self.def_expstart)
1310                    project = tbparams[self.current_testbed].get('project')
1311                    gwcmd = tbparams[self.current_testbed].get(\
1312                            'slaveconnectorcmd', self.def_gwcmd)
1313                    gwcmdparams = tbparams[self.current_testbed].get(\
1314                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1315                    mgwcmd = tbparams[self.current_testbed].get(\
1316                            'masterconnectorcmd', self.def_gwcmd)
1317                    mgwcmdparams = tbparams[self.current_testbed].get(\
1318                            'masterconnectorcmdparams', self.def_gwcmdparams)
1319                    line = re.sub("GWTYPE", gwtype, line)
1320                    line = re.sub("GWIMAGE", gwimage, line)
1321                    if self.current_testbed == master:
1322                        line = re.sub("GWSTART", mgwstart, line)
1323                        line = re.sub("EXPSTART", mexpstart, line)
1324                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1325                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1326                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1327                    else:
1328                        line = re.sub("GWSTART", gwstart, line)
1329                        line = re.sub("EXPSTART", expstart, line)
1330                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1331                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1332                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1333                    #These expansions contain EID and PROJDIR.  NB these are
1334                    # local fedkit and gatewaykit, which are strings.
1335                    if self.fedkit:
1336                        line = re.sub("FEDKIT", self.fedkit, line)
1337                    if self.gatewaykit:
1338                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1339                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1340                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1341                    line = re.sub("EID", self.eid, line)
1342                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1343                            (project, self.eid), line)
1344                    print >>self.testbed_file, line
1345                return True
1346
1347    class allbeds:
1348        """
1349        Process the Allbeds section.  Get access to each federant and save the
1350        parameters in tbparams
1351        """
1352        def __init__(self, get_access):
1353            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1354            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1355            self.in_allbeds = False
1356            self.get_access = get_access
1357
1358        def __call__(self, line, user, tbparams, master, export_project,
1359                access_user):
1360            # Testbed access parameters
1361            if not self.in_allbeds:
1362                if self.begin_allbeds.match(line):
1363                    self.in_allbeds = True
1364                    return True
1365                else:
1366                    return False
1367            else:
1368                if self.end_allbeds.match(line):
1369                    self.in_allbeds = False
1370                else:
1371                    nodes = line.split('|')
1372                    tb = nodes.pop(0)
1373                    self.get_access(tb, nodes, user, tbparams, master,
1374                            export_project, access_user)
1375                return True
1376
1377    class gateways:
1378        def __init__(self, eid, master, tmpdir, gw_pubkey,
1379                gw_secretkey, copy_file, fedkit):
1380            self.begin_gateways = \
1381                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1382            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1383            self.current_gateways = None
1384            self.control_gateway = None
1385            self.active_end = { }
1386
1387            self.eid = eid
1388            self.master = master
1389            self.tmpdir = tmpdir
1390            self.gw_pubkey_base = gw_pubkey
1391            self.gw_secretkey_base = gw_secretkey
1392
1393            self.copy_file = copy_file
1394            self.fedkit = fedkit
1395
1396
1397        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1398                active_end, tbparams, dtb, myname, desthost, type):
1399            """
1400            Produce a gateway configuration file from a gateways line.
1401            """
1402
1403            sproject = tbparams[gw].get('project', 'project')
1404            dproject = tbparams[dtb].get('project', 'project')
1405            sdomain = ".%s.%s%s" % (eid, sproject,
1406                    tbparams[gw].get('domain', ".example.com"))
1407            ddomain = ".%s.%s%s" % (eid, dproject,
1408                    tbparams[dtb].get('domain', ".example.com"))
1409            boss = tbparams[master].get('boss', "boss")
1410            fs = tbparams[master].get('fs', "fs")
1411            event_server = "%s%s" % \
1412                    (tbparams[gw].get('eventserver', "event_server"),
1413                            tbparams[gw].get('domain', "example.com"))
1414            remote_event_server = "%s%s" % \
1415                    (tbparams[dtb].get('eventserver', "event_server"),
1416                            tbparams[dtb].get('domain', "example.com"))
1417            seer_control = "%s%s" % \
1418                    (tbparams[gw].get('control', "control"), sdomain)
1419            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1420
1421            if self.fedkit:
1422                remote_script_dir = "/usr/local/federation/bin"
1423                local_script_dir = "/usr/local/federation/bin"
1424            else:
1425                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1426                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1427
1428            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1429            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1430            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1431
1432            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1433            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1434
1435            # translate to lower case so the `hostname` hack for specifying
1436            # configuration files works.
1437            conf_file = conf_file.lower();
1438            remote_conf_file = remote_conf_file.lower();
1439
1440            if dtb == master:
1441                active = "false"
1442            elif gw == master:
1443                active = "true"
1444            elif active_end.has_key('%s-%s' % (dtb, gw)):
1445                active = "false"
1446            else:
1447                active_end['%s-%s' % (gw, dtb)] = 1
1448                active = "true"
1449
1450            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1451            print >>gwconfig, "Active: %s" % active
1452            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1453            if tunnel_iface:
1454                print >>gwconfig, "Interface: %s" % tunnel_iface
1455            print >>gwconfig, "BossName: %s" % boss
1456            print >>gwconfig, "FsName: %s" % fs
1457            print >>gwconfig, "EventServerName: %s" % event_server
1458            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1459            print >>gwconfig, "SeerControl: %s" % seer_control
1460            print >>gwconfig, "Type: %s" % type
1461            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1462            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1463                    local_script_dir
1464            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1465            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1466            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1467                    (remote_conf_dir, remote_conf_file)
1468            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1469            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1470            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1471            gwconfig.close()
1472
1473            return active == "true"
1474
1475        def __call__(self, line, allocated, tbparams):
1476            # Process gateways
1477            if not self.current_gateways:
1478                m = self.begin_gateways.match(line)
1479                if m:
1480                    self.current_gateways = m.group(1)
1481                    if allocated.has_key(self.current_gateways):
1482                        # This test should always succeed
1483                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1484                        if not os.path.exists(tb_dir):
1485                            try:
1486                                os.mkdir(tb_dir)
1487                            except IOError:
1488                                raise service_error(service_error.internal,
1489                                        "Cannot create %s" % tb_dir)
1490                    else:
1491                        # XXX
1492                        self.log.error("[gateways]: Ignoring gateways for " + \
1493                                "unknown testbed %s" % self.current_gateways)
1494                        self.current_gateways = None
1495                    return True
1496                else:
1497                    return False
1498            else:
1499                m = self.end_gateways.match(line)
1500                if m :
1501                    if m.group(1) != self.current_gateways:
1502                        raise service_error(service_error.internal,
1503                                "Mismatched gateway markers!?")
1504                    if self.control_gateway:
1505                        try:
1506                            cc = open("%s/%s/client.conf" %
1507                                    (self.tmpdir, self.current_gateways), 'w')
1508                            print >>cc, "ControlGateway: %s" % \
1509                                    self.control_gateway
1510                            if tbparams[self.master].has_key('smbshare'):
1511                                print >>cc, "SMBSHare: %s" % \
1512                                        tbparams[self.master]['smbshare']
1513                            print >>cc, "ProjectUser: %s" % \
1514                                    tbparams[self.master]['user']
1515                            print >>cc, "ProjectName: %s" % \
1516                                    tbparams[self.master]['project']
1517                            print >>cc, "ExperimentID: %s/%s" % \
1518                                    ( tbparams[self.master]['project'], \
1519                                    self.eid )
1520                            cc.close()
1521                        except IOError:
1522                            raise service_error(service_error.internal,
1523                                    "Error creating client config")
1524                        # XXX: This seer specific file should disappear
1525                        try:
1526                            cc = open("%s/%s/seer.conf" %
1527                                    (self.tmpdir, self.current_gateways),
1528                                    'w')
1529                            if self.current_gateways != self.master:
1530                                print >>cc, "ControlNode: %s" % \
1531                                        self.control_gateway
1532                            print >>cc, "ExperimentID: %s/%s" % \
1533                                    ( tbparams[self.master]['project'], \
1534                                    self.eid )
1535                            cc.close()
1536                        except IOError:
1537                            raise service_error(service_error.internal,
1538                                    "Error creating seer config")
1539                    else:
1540                        debug.error("[gateways]: No control gateway for %s" %\
1541                                    self.current_gateways)
1542                    self.current_gateways = None
1543                else:
1544                    dtb, myname, desthost, type = line.split(" ")
1545
1546                    if type == "control" or type == "both":
1547                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1548                                self.eid, 
1549                                tbparams[self.current_gateways]['project'],
1550                                tbparams[self.current_gateways]['domain'])
1551                    try:
1552                        active = self.gateway_conf_file(self.current_gateways,
1553                                self.master, self.eid, self.gw_pubkey_base,
1554                                self.gw_secretkey_base,
1555                                self.active_end, tbparams, dtb, myname,
1556                                desthost, type)
1557                    except IOError, e:
1558                        raise service_error(service_error.internal,
1559                                "Failed to write config file for %s" % \
1560                                        self.current_gateway)
1561           
1562                    gw_pubkey = "%s/keys/%s" % \
1563                            (self.tmpdir, self.gw_pubkey_base)
1564                    gw_secretkey = "%s/keys/%s" % \
1565                            (self.tmpdir, self.gw_secretkey_base)
1566
1567                    pkfile = "%s/%s/%s" % \
1568                            ( self.tmpdir, self.current_gateways, 
1569                                    self.gw_pubkey_base)
1570                    skfile = "%s/%s/%s" % \
1571                            ( self.tmpdir, self.current_gateways, 
1572                                    self.gw_secretkey_base)
1573
1574                    if not os.path.exists(pkfile):
1575                        try:
1576                            self.copy_file(gw_pubkey, pkfile)
1577                        except IOError:
1578                            service_error(service_error.internal,
1579                                    "Failed to copy pubkey file")
1580
1581                    if active and not os.path.exists(skfile):
1582                        try:
1583                            self.copy_file(gw_secretkey, skfile)
1584                        except IOError:
1585                            service_error(service_error.internal,
1586                                    "Failed to copy secretkey file")
1587                return True
1588
1589    class shunt_to_file:
1590        """
1591        Simple class to write data between two regexps to a file.
1592        """
1593        def __init__(self, begin, end, filename):
1594            """
1595            Begin shunting on a match of begin, stop on end, send data to
1596            filename.
1597            """
1598            self.begin = re.compile(begin)
1599            self.end = re.compile(end)
1600            self.in_shunt = False
1601            self.file = None
1602            self.filename = filename
1603
1604        def __call__(self, line):
1605            """
1606            Call this on each line in the input that may be shunted.
1607            """
1608            if not self.in_shunt:
1609                if self.begin.match(line):
1610                    self.in_shunt = True
1611                    try:
1612                        self.file = open(self.filename, "w")
1613                    except:
1614                        self.file = None
1615                        raise
1616                    return True
1617                else:
1618                    return False
1619            else:
1620                if self.end.match(line):
1621                    if self.file: 
1622                        self.file.close()
1623                        self.file = None
1624                    self.in_shunt = False
1625                else:
1626                    if self.file:
1627                        print >>self.file, line
1628                return True
1629
1630    class shunt_to_list:
1631        """
1632        Same interface as shunt_to_file.  Data collected in self.list, one list
1633        element per line.
1634        """
1635        def __init__(self, begin, end):
1636            self.begin = re.compile(begin)
1637            self.end = re.compile(end)
1638            self.in_shunt = False
1639            self.list = [ ]
1640       
1641        def __call__(self, line):
1642            if not self.in_shunt:
1643                if self.begin.match(line):
1644                    self.in_shunt = True
1645                    return True
1646                else:
1647                    return False
1648            else:
1649                if self.end.match(line):
1650                    self.in_shunt = False
1651                else:
1652                    self.list.append(line)
1653                return True
1654
1655    class shunt_to_string:
1656        """
1657        Same interface as shunt_to_file.  Data collected in self.str, all in
1658        one string.
1659        """
1660        def __init__(self, begin, end):
1661            self.begin = re.compile(begin)
1662            self.end = re.compile(end)
1663            self.in_shunt = False
1664            self.str = ""
1665       
1666        def __call__(self, line):
1667            if not self.in_shunt:
1668                if self.begin.match(line):
1669                    self.in_shunt = True
1670                    return True
1671                else:
1672                    return False
1673            else:
1674                if self.end.match(line):
1675                    self.in_shunt = False
1676                else:
1677                    self.str += line
1678                return True
1679
1680    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1681            tbparams, tmpdir, alloc_log=None):
1682        started = { }           # Testbeds where a sub-experiment started
1683                                # successfully
1684
1685        # XXX
1686        fail_soft = False
1687
1688        log = alloc_log or self.log
1689
1690        thread_pool = self.thread_pool(self.nthreads)
1691        threads = [ ]
1692
1693        for tb in [ k for k in allocated.keys() if k != master]:
1694            # Create and start a thread to start the segment, and save it to
1695            # get the return value later
1696            thread_pool.wait_for_slot()
1697            t  = self.pooled_thread(\
1698                    target=self.start_segment(log=log,
1699                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1700                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1701                    pdata=thread_pool, trace_file=self.trace_file)
1702            threads.append(t)
1703            t.start()
1704
1705        # Wait until all finish
1706        thread_pool.wait_for_all_done()
1707
1708        # If none failed, start the master
1709        failed = [ t.getName() for t in threads if not t.rv ]
1710
1711        if len(failed) == 0:
1712            starter = self.start_segment(log=log, 
1713                    keyfile=self.ssh_privkey_file, debug=self.debug)
1714            if not starter(master, eid, tbparams, tmpdir):
1715                failed.append(master)
1716
1717        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1718        # If one failed clean up, unless fail_soft is set
1719        if failed:
1720            if not fail_soft:
1721                thread_pool.clear()
1722                for tb in succeeded:
1723                    # Create and start a thread to stop the segment
1724                    thread_pool.wait_for_slot()
1725                    t  = self.pooled_thread(\
1726                            target=self.stop_segment(log=log,
1727                                keyfile=self.ssh_privkey_file,
1728                                debug=self.debug), 
1729                            args=(tb, eid, tbparams), name=tb,
1730                            pdata=thread_pool, trace_file=self.trace_file)
1731                    t.start()
1732                # Wait until all finish
1733                thread_pool.wait_for_all_done()
1734
1735                # release the allocations
1736                for tb in tbparams.keys():
1737                    self.release_access(tb, tbparams[tb]['allocID'])
1738                # Remove the placeholder
1739                self.state_lock.acquire()
1740                self.state[eid]['experimentStatus'] = 'failed'
1741                if self.state_filename: self.write_state()
1742                self.state_lock.release()
1743
1744                #raise service_error(service_error.federant,
1745                #    "Swap in failed on %s" % ",".join(failed))
1746                log.error("Swap in failed on %s" % ",".join(failed))
1747                return
1748        else:
1749            log.info("[start_segment]: Experiment %s active" % eid)
1750
1751        log.debug("[start_experiment]: removing %s" % tmpdir)
1752
1753        # Walk up tmpdir, deleting as we go
1754        for path, dirs, files in os.walk(tmpdir, topdown=False):
1755            for f in files:
1756                os.remove(os.path.join(path, f))
1757            for d in dirs:
1758                os.rmdir(os.path.join(path, d))
1759        os.rmdir(tmpdir)
1760
1761        # Insert the experiment into our state and update the disk copy
1762        self.state_lock.acquire()
1763        self.state[expid]['experimentStatus'] = 'active'
1764        self.state[eid] = self.state[expid]
1765        if self.state_filename: self.write_state()
1766        self.state_lock.release()
1767        return
1768
1769    def create_experiment(self, req, fid):
1770        """
1771        The external interface to experiment creation called from the
1772        dispatcher.
1773
1774        Creates a working directory, splits the incoming description using the
1775        splitter script and parses out the avrious subsections using the
1776        lcasses above.  Once each sub-experiment is created, use pooled threads
1777        to instantiate them and start it all up.
1778        """
1779
1780        if not self.auth.check_attribute(fid, 'create'):
1781            raise service_error(service_error.access, "Create access denied")
1782
1783        try:
1784            tmpdir = tempfile.mkdtemp(prefix="split-")
1785        except IOError:
1786            raise service_error(service_error.internal, "Cannot create tmp dir")
1787
1788        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1789        gw_secretkey_base = "fed.%s" % self.ssh_type
1790        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1791        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1792        tclfile = tmpdir + "/experiment.tcl"
1793        tbparams = { }
1794        try:
1795            access_user = self.accessdb[fid]
1796        except KeyError:
1797            raise service_error(service_error.internal,
1798                    "Access map and authorizer out of sync in " + \
1799                            "create_experiment for fedid %s"  % fid)
1800
1801        pid = "dummy"
1802        gid = "dummy"
1803        try:
1804            os.mkdir(tmpdir+"/keys")
1805        except OSError:
1806            raise service_error(service_error.internal,
1807                    "Can't make temporary dir")
1808
1809        req = req.get('CreateRequestBody', None)
1810        if not req:
1811            raise service_error(service_error.req,
1812                    "Bad request format (no CreateRequestBody)")
1813        # The tcl parser needs to read a file so put the content into that file
1814        descr=req.get('experimentdescription', None)
1815        if descr:
1816            file_content=descr.get('ns2description', None)
1817            if file_content:
1818                try:
1819                    f = open(tclfile, 'w')
1820                    f.write(file_content)
1821                    f.close()
1822                except IOError:
1823                    raise service_error(service_error.internal,
1824                            "Cannot write temp experiment description")
1825            else:
1826                raise service_error(service_error.req, 
1827                        "Only ns2descriptions supported")
1828        else:
1829            raise service_error(service_error.req, "No experiment description")
1830
1831        # Generate an ID for the experiment (slice) and a certificate that the
1832        # allocator can use to prove they own it.  We'll ship it back through
1833        # the encrypted connection.
1834        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1835
1836        if req.has_key('experimentID') and \
1837                req['experimentID'].has_key('localname'):
1838            overwrite = False
1839            eid = req['experimentID']['localname']
1840            # If there's an old failed experiment here with the same local name
1841            # and accessible by this user, we'll overwrite it, otherwise we'll
1842            # fall through and do the collision avoidance.
1843            old_expid = self.get_experiment_fedid(eid)
1844            if old_expid and self.check_experiment_access(fid, old_expid):
1845                self.state_lock.acquire()
1846                status = self.state[eid].get('experimentStatus', None)
1847                if status and status == 'failed':
1848                    # remove the old access attribute
1849                    self.auth.unset_attribute(fid, old_expid)
1850                    overwrite = True
1851                    del self.state[eid]
1852                    del self.state[old_expid]
1853                self.state_lock.release()
1854            self.state_lock.acquire()
1855            while (self.state.has_key(eid) and not overwrite):
1856                eid += random.choice(string.ascii_letters)
1857            # Initial state
1858            self.state[eid] = {
1859                    'experimentID' : \
1860                            [ { 'localname' : eid }, {'fedid': expid } ],
1861                    'experimentStatus': 'starting',
1862                    'experimentAccess': { 'X509' : expcert },
1863                    'owner': fid,
1864                    'log' : [],
1865                }
1866            self.state[expid] = self.state[eid]
1867            if self.state_filename: self.write_state()
1868            self.state_lock.release()
1869        else:
1870            eid = self.exp_stem
1871            for i in range(0,5):
1872                eid += random.choice(string.ascii_letters)
1873            self.state_lock.acquire()
1874            while (self.state.has_key(eid)):
1875                eid = self.exp_stem
1876                for i in range(0,5):
1877                    eid += random.choice(string.ascii_letters)
1878            # Initial state
1879            self.state[eid] = {
1880                    'experimentID' : \
1881                            [ { 'localname' : eid }, {'fedid': expid } ],
1882                    'experimentStatus': 'starting',
1883                    'experimentAccess': { 'X509' : expcert },
1884                    'owner': fid,
1885                    'log' : [],
1886                }
1887            self.state[expid] = self.state[eid]
1888            if self.state_filename: self.write_state()
1889            self.state_lock.release()
1890
1891        try: 
1892            # This catches exceptions to clear the placeholder if necessary
1893            try:
1894                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1895            except ValueError:
1896                raise service_error(service_error.server_config, 
1897                        "Bad key type (%s)" % self.ssh_type)
1898
1899            user = req.get('user', None)
1900            if user == None:
1901                raise service_error(service_error.req, "No user")
1902
1903            master = req.get('master', None)
1904            if not master:
1905                raise service_error(service_error.req,
1906                        "No master testbed label")
1907            export_project = req.get('exportProject', None)
1908            if not export_project:
1909                raise service_error(service_error.req, "No export project")
1910           
1911            if self.splitter_url:
1912                self.log.debug("Calling remote splitter at %s" % \
1913                        self.splitter_url)
1914                split_data = self.remote_splitter(self.splitter_url,
1915                        file_content, master)
1916            else:
1917                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1918                    str(self.muxmax), '-m', master]
1919
1920                if self.fedkit:
1921                    tclcmd.append('-k')
1922
1923                if self.gatewaykit:
1924                    tclcmd.append('-K')
1925
1926                tclcmd.extend([pid, gid, eid, tclfile])
1927
1928                self.log.debug("running local splitter %s", " ".join(tclcmd))
1929                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True)
1930                split_data = tclparser.stdout
1931
1932            allocated = { }         # Testbeds we can access
1933            # Objects to parse the splitter output (defined above)
1934            parse_current_testbed = self.current_testbed(eid, tmpdir,
1935                    self.fedkit, self.gatewaykit)
1936            parse_allbeds = self.allbeds(self.get_access)
1937            parse_gateways = self.gateways(eid, master, tmpdir,
1938                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1939                    self.fedkit)
1940            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1941                        "^#\s+End\s+Vtopo")
1942            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1943                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1944            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1945                    "^#\s+End\s+tarfiles")
1946            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1947                    "^#\s+End\s+rpms")
1948
1949            # Working on the split data
1950            for line in split_data:
1951                line = line.rstrip()
1952                if parse_current_testbed(line, master, allocated, tbparams):
1953                    continue
1954                elif parse_allbeds(line, user, tbparams, master, export_project,
1955                        access_user):
1956                    continue
1957                elif parse_gateways(line, allocated, tbparams):
1958                    continue
1959                elif parse_vtopo(line):
1960                    continue
1961                elif parse_hostnames(line):
1962                    continue
1963                elif parse_tarfiles(line):
1964                    continue
1965                elif parse_rpms(line):
1966                    continue
1967                else:
1968                    raise service_error(service_error.internal, 
1969                            "Bad tcl parse? %s" % line)
1970            # Virtual topology and visualization
1971            vtopo = self.gentopo(parse_vtopo.str)
1972            if not vtopo:
1973                raise service_error(service_error.internal, 
1974                        "Failed to generate virtual topology")
1975
1976            vis = self.genviz(vtopo)
1977            if not vis:
1978                raise service_error(service_error.internal, 
1979                        "Failed to generate visualization")
1980
1981           
1982            # save federant information
1983            for k in allocated.keys():
1984                tbparams[k]['federant'] = {\
1985                        'name': [ { 'localname' : eid} ],\
1986                        'emulab': tbparams[k]['emulab'],\
1987                        'allocID' : tbparams[k]['allocID'],\
1988                        'master' : k == master,\
1989                    }
1990
1991            self.state_lock.acquire()
1992            self.state[eid]['vtopo'] = vtopo
1993            self.state[eid]['vis'] = vis
1994            self.state[expid]['federant'] = \
1995                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1996                        if tbparams[tb].has_key('federant') ]
1997            if self.state_filename: self.write_state()
1998            self.state_lock.release()
1999
2000            # Copy tarfiles and rpms needed at remote sites into a staging area
2001            try:
2002                if self.fedkit:
2003                    for t in self.fedkit:
2004                        parse_tarfiles.list.append(t[1])
2005                if self.gatewaykit:
2006                    for t in self.gatewaykit:
2007                        parse_tarfiles.list.append(t[1])
2008                for t in parse_tarfiles.list:
2009                    if not os.path.exists("%s/tarfiles" % tmpdir):
2010                        os.mkdir("%s/tarfiles" % tmpdir)
2011                    self.copy_file(t, "%s/tarfiles/%s" % \
2012                            (tmpdir, os.path.basename(t)))
2013                for r in parse_rpms.list:
2014                    if not os.path.exists("%s/rpms" % tmpdir):
2015                        os.mkdir("%s/rpms" % tmpdir)
2016                    self.copy_file(r, "%s/rpms/%s" % \
2017                            (tmpdir, os.path.basename(r)))
2018                # A null experiment file in case we need to create a remote
2019                # experiment from scratch
2020                f = open("%s/null.tcl" % tmpdir, "w")
2021                print >>f, """
2022set ns [new Simulator]
2023source tb_compat.tcl
2024
2025set a [$ns node]
2026
2027$ns rtproto Session
2028$ns run
2029"""
2030                f.close()
2031
2032            except IOError, e:
2033                raise service_error(service_error.internal, 
2034                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2035
2036        except service_error, e:
2037            # If something goes wrong in the parse (usually an access error)
2038            # clear the placeholder state.  From here on out the code delays
2039            # exceptions.  Failing at this point returns a fault to the remote
2040            # caller.
2041            self.state_lock.acquire()
2042            del self.state[eid]
2043            del self.state[expid]
2044            if self.state_filename: self.write_state()
2045            self.state_lock.release()
2046            raise e
2047
2048
2049        # Start the background swapper and return the starting state.  From
2050        # here on out, the state will stick around a while.
2051
2052        # Let users touch the state
2053        self.auth.set_attribute(fid, expid)
2054        self.auth.set_attribute(expid, expid)
2055        # Override fedids can manipulate state as well
2056        for o in self.overrides:
2057            self.auth.set_attribute(o, expid)
2058
2059        # Create a logger that logs to the experiment's state object as well as
2060        # to the main log file.
2061        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2062        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2063        # XXX: there should be a global one of these rather than repeating the
2064        # code.
2065        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2066                    '%d %b %y %H:%M:%S'))
2067        alloc_log.addHandler(h)
2068       
2069        # Start a thread to do the resource allocation
2070        t  = Thread(target=self.allocate_resources,
2071                args=(allocated, master, eid, expid, expcert, tbparams, 
2072                    tmpdir, alloc_log),
2073                name=eid)
2074        t.start()
2075
2076        rv = {
2077                'experimentID': [
2078                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2079                ],
2080                'experimentStatus': 'starting',
2081                'experimentAccess': { 'X509' : expcert }
2082            }
2083
2084        return rv
2085   
2086    def get_experiment_fedid(self, key):
2087        """
2088        find the fedid associated with the localname key in the state database.
2089        """
2090
2091        rv = None
2092        self.state_lock.acquire()
2093        if self.state.has_key(key):
2094            if isinstance(self.state[key], dict):
2095                try:
2096                    kl = [ f['fedid'] for f in \
2097                            self.state[key]['experimentID']\
2098                                if f.has_key('fedid') ]
2099                except KeyError:
2100                    self.state_lock.release()
2101                    raise service_error(service_error.internal, 
2102                            "No fedid for experiment %s when getting "+\
2103                                    "fedid(!?)" % key)
2104                if len(kl) == 1:
2105                    rv = kl[0]
2106                else:
2107                    self.state_lock.release()
2108                    raise service_error(service_error.internal, 
2109                            "multiple fedids for experiment %s when " +\
2110                                    "getting fedid(!?)" % key)
2111            else:
2112                self.state_lock.release()
2113                raise service_error(service_error.internal, 
2114                        "Unexpected state for %s" % key)
2115        self.state_lock.release()
2116        return rv
2117
2118    def check_experiment_access(self, fid, key):
2119        """
2120        Confirm that the fid has access to the experiment.  Though a request
2121        may be made in terms of a local name, the access attribute is always
2122        the experiment's fedid.
2123        """
2124        if not isinstance(key, fedid):
2125            key = self.get_experiment_fedid(key)
2126
2127        if self.auth.check_attribute(fid, key):
2128            return True
2129        else:
2130            raise service_error(service_error.access, "Access Denied")
2131
2132
2133
2134    def get_vtopo(self, req, fid):
2135        """
2136        Return the stored virtual topology for this experiment
2137        """
2138        rv = None
2139        state = None
2140
2141        req = req.get('VtopoRequestBody', None)
2142        if not req:
2143            raise service_error(service_error.req,
2144                    "Bad request format (no VtopoRequestBody)")
2145        exp = req.get('experiment', None)
2146        if exp:
2147            if exp.has_key('fedid'):
2148                key = exp['fedid']
2149                keytype = "fedid"
2150            elif exp.has_key('localname'):
2151                key = exp['localname']
2152                keytype = "localname"
2153            else:
2154                raise service_error(service_error.req, "Unknown lookup type")
2155        else:
2156            raise service_error(service_error.req, "No request?")
2157
2158        self.check_experiment_access(fid, key)
2159
2160        self.state_lock.acquire()
2161        if self.state.has_key(key):
2162            if self.state[key].has_key('vtopo'):
2163                rv = { 'experiment' : {keytype: key },\
2164                        'vtopo': self.state[key]['vtopo'],\
2165                    }
2166            else:
2167                state = self.state[key]['experimentStatus']
2168        self.state_lock.release()
2169
2170        if rv: return rv
2171        else: 
2172            if state:
2173                raise service_error(service_error.partial, 
2174                        "Not ready: %s" % state)
2175            else:
2176                raise service_error(service_error.req, "No such experiment")
2177
2178    def get_vis(self, req, fid):
2179        """
2180        Return the stored visualization for this experiment
2181        """
2182        rv = None
2183        state = None
2184
2185        req = req.get('VisRequestBody', None)
2186        if not req:
2187            raise service_error(service_error.req,
2188                    "Bad request format (no VisRequestBody)")
2189        exp = req.get('experiment', None)
2190        if exp:
2191            if exp.has_key('fedid'):
2192                key = exp['fedid']
2193                keytype = "fedid"
2194            elif exp.has_key('localname'):
2195                key = exp['localname']
2196                keytype = "localname"
2197            else:
2198                raise service_error(service_error.req, "Unknown lookup type")
2199        else:
2200            raise service_error(service_error.req, "No request?")
2201
2202        self.check_experiment_access(fid, key)
2203
2204        self.state_lock.acquire()
2205        if self.state.has_key(key):
2206            if self.state[key].has_key('vis'):
2207                rv =  { 'experiment' : {keytype: key },\
2208                        'vis': self.state[key]['vis'],\
2209                        }
2210            else:
2211                state = self.state[key]['experimentStatus']
2212        self.state_lock.release()
2213
2214        if rv: return rv
2215        else:
2216            if state:
2217                raise service_error(service_error.partial, 
2218                        "Not ready: %s" % state)
2219            else:
2220                raise service_error(service_error.req, "No such experiment")
2221
2222    def clean_info_response(self, rv):
2223        """
2224        Remove the information in the experiment's state object that is not in
2225        the info response.
2226        """
2227        # Remove the owner info (should always be there, but...)
2228        if rv.has_key('owner'): del rv['owner']
2229
2230        # Convert the log into the allocationLog parameter and remove the
2231        # log entry (with defensive programming)
2232        if rv.has_key('log'):
2233            rv['allocationLog'] = "".join(rv['log'])
2234            del rv['log']
2235        else:
2236            rv['allocationLog'] = ""
2237
2238        if rv['experimentStatus'] != 'active':
2239            if rv.has_key('federant'): del rv['federant']
2240        else:
2241            # remove the allocationID info from each federant
2242            for f in rv.get('federant', []):
2243                if f.has_key('allocID'): del f['allocID']
2244        return rv
2245
2246    def get_info(self, req, fid):
2247        """
2248        Return all the stored info about this experiment
2249        """
2250        rv = None
2251
2252        req = req.get('InfoRequestBody', None)
2253        if not req:
2254            raise service_error(service_error.req,
2255                    "Bad request format (no InfoRequestBody)")
2256        exp = req.get('experiment', None)
2257        if exp:
2258            if exp.has_key('fedid'):
2259                key = exp['fedid']
2260                keytype = "fedid"
2261            elif exp.has_key('localname'):
2262                key = exp['localname']
2263                keytype = "localname"
2264            else:
2265                raise service_error(service_error.req, "Unknown lookup type")
2266        else:
2267            raise service_error(service_error.req, "No request?")
2268
2269        self.check_experiment_access(fid, key)
2270
2271        # The state may be massaged by the service function that called
2272        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2273        # state.
2274        self.state_lock.acquire()
2275        if self.state.has_key(key):
2276            rv = copy.deepcopy(self.state[key])
2277        self.state_lock.release()
2278
2279        if rv:
2280            return self.clean_info_response(rv)
2281        else:
2282            raise service_error(service_error.req, "No such experiment")
2283
2284    def get_multi_info(self, req, fid):
2285        """
2286        Return all the stored info that this fedid can access
2287        """
2288        rv = { 'info': [ ] }
2289
2290        self.state_lock.acquire()
2291        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2292            self.check_experiment_access(fid, key)
2293
2294            if self.state.has_key(key):
2295                e = copy.deepcopy(self.state[key])
2296                e = self.clean_info_response(e)
2297                rv['info'].append(e)
2298        self.state_lock.release()
2299        return rv
2300
2301
2302    def terminate_experiment(self, req, fid):
2303        """
2304        Swap this experiment out on the federants and delete the shared
2305        information
2306        """
2307        tbparams = { }
2308        req = req.get('TerminateRequestBody', None)
2309        if not req:
2310            raise service_error(service_error.req,
2311                    "Bad request format (no TerminateRequestBody)")
2312        force = req.get('force', False)
2313        exp = req.get('experiment', None)
2314        if exp:
2315            if exp.has_key('fedid'):
2316                key = exp['fedid']
2317                keytype = "fedid"
2318            elif exp.has_key('localname'):
2319                key = exp['localname']
2320                keytype = "localname"
2321            else:
2322                raise service_error(service_error.req, "Unknown lookup type")
2323        else:
2324            raise service_error(service_error.req, "No request?")
2325
2326        self.check_experiment_access(fid, key)
2327
2328        dealloc_list = [ ]
2329
2330
2331        # Create a logger that logs to the dealloc_list as well as to the main
2332        # log file.
2333        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2334        h = logging.StreamHandler(self.list_log(dealloc_list))
2335        # XXX: there should be a global one of these rather than repeating the
2336        # code.
2337        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2338                    '%d %b %y %H:%M:%S'))
2339        dealloc_log.addHandler(h)
2340
2341        self.state_lock.acquire()
2342        fed_exp = self.state.get(key, None)
2343
2344        if fed_exp:
2345            # This branch of the conditional holds the lock to generate a
2346            # consistent temporary tbparams variable to deallocate experiments.
2347            # It releases the lock to do the deallocations and reacquires it to
2348            # remove the experiment state when the termination is complete.
2349
2350            # First make sure that the experiment creation is complete.
2351            status = fed_exp.get('experimentStatus', None)
2352
2353            if status:
2354                if status in ('starting', 'terminating'):
2355                    if not force:
2356                        self.state_lock.release()
2357                        raise service_error(service_error.partial, 
2358                                'Experiment still being created or destroyed')
2359                    else:
2360                        self.log.warning('Experiment in %s state ' % status + \
2361                                'being terminated by force.')
2362            else:
2363                # No status??? trouble
2364                self.state_lock.release()
2365                raise service_error(service_error.internal,
2366                        "Experiment has no status!?")
2367
2368            ids = []
2369            #  experimentID is a list of dicts that are self-describing
2370            #  identifiers.  This finds all the fedids and localnames - the
2371            #  keys of self.state - and puts them into ids.
2372            for id in fed_exp.get('experimentID', []):
2373                if id.has_key('fedid'): ids.append(id['fedid'])
2374                if id.has_key('localname'): ids.append(id['localname'])
2375
2376            # Construct enough of the tbparams to make the stop_segment calls
2377            # work
2378            for fed in fed_exp.get('federant', []):
2379                try:
2380                    for e in fed['name']:
2381                        eid = e.get('localname', None)
2382                        if eid: break
2383                    else:
2384                        continue
2385
2386                    p = fed['emulab']['project']
2387
2388                    project = p['name']['localname']
2389                    tb = p['testbed']['localname']
2390                    user = p['user'][0]['userID']['localname']
2391
2392                    domain = fed['emulab']['domain']
2393                    host  = fed['emulab']['ops']
2394                    aid = fed['allocID']
2395                except KeyError, e:
2396                    continue
2397                tbparams[tb] = {\
2398                        'user': user,\
2399                        'domain': domain,\
2400                        'project': project,\
2401                        'host': host,\
2402                        'eid': eid,\
2403                        'aid': aid,\
2404                    }
2405            fed_exp['experimentStatus'] = 'terminating'
2406            if self.state_filename: self.write_state()
2407            self.state_lock.release()
2408
2409            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2410            # then completes, so we can't wait if nothing starts.  So, no
2411            # tbparams, no start.
2412            if len(tbparams) > 0:
2413                thread_pool = self.thread_pool(self.nthreads)
2414                for tb in tbparams.keys():
2415                    # Create and start a thread to stop the segment
2416                    thread_pool.wait_for_slot()
2417                    t  = self.pooled_thread(\
2418                            target=self.stop_segment(log=dealloc_log,
2419                                keyfile=self.ssh_privkey_file, debug=self.debug), 
2420                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2421                            pdata=thread_pool, trace_file=self.trace_file)
2422                    t.start()
2423                # Wait for completions
2424                thread_pool.wait_for_all_done()
2425
2426            # release the allocations (failed experiments have done this
2427            # already, and starting experiments may be in odd states, so we
2428            # ignore errors releasing those allocations
2429            try: 
2430                for tb in tbparams.keys():
2431                    self.release_access(tb, tbparams[tb]['aid'])
2432            except service_error, e:
2433                if status != 'failed' and not force:
2434                    raise e
2435
2436            # Remove the terminated experiment
2437            self.state_lock.acquire()
2438            for id in ids:
2439                if self.state.has_key(id): del self.state[id]
2440
2441            if self.state_filename: self.write_state()
2442            self.state_lock.release()
2443
2444            return { 
2445                    'experiment': exp , 
2446                    'deallocationLog': "".join(dealloc_list),
2447                    }
2448        else:
2449            # Don't forget to release the lock
2450            self.state_lock.release()
2451            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.