source: fedd/federation/experiment_control.py @ d5e3b8e

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since d5e3b8e was d5e3b8e, checked in by Ted Faber <faber@…>, 15 years ago

Hopefully fixed a timeout bug

  • Property mode set to 100644
File size: 89.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = Lock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
70   
71    class thread_pool:
72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
148
149    class pooled_thread(Thread):
150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
188
189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
192
193    def __init__(self, config=None, auth=None):
194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
214        self.list_log = experiment_control_local.list_log
215
216        self.cert_file = config.get("experiment_control", "cert_file")
217        if self.cert_file:
218            self.cert_pwd = config.get("experiment_control", "cert_pwd")
219        else:
220            self.cert_file = config.get("globals", "cert_file")
221            self.cert_pwd = config.get("globals", "cert_pwd")
222
223        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
224                or config.get("globals", "trusted_certs")
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 2
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'Create': soap_handler('Create', self.create_experiment),
337                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
338                'Vis': soap_handler('Vis', self.get_vis),
339                'Info': soap_handler('Info', self.get_info),
340                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
341                'Terminate': soap_handler('Terminate', 
342                    self.terminate_experiment),
343        }
344
345        self.xmlrpc_services = {\
346                'Create': xmlrpc_handler('Create', self.create_experiment),
347                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
348                'Vis': xmlrpc_handler('Vis', self.get_vis),
349                'Info': xmlrpc_handler('Info', self.get_info),
350                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
351                'Terminate': xmlrpc_handler('Terminate',
352                    self.terminate_experiment),
353        }
354
355    def copy_file(self, src, dest, size=1024):
356        """
357        Exceedingly simple file copy.
358        """
359        s = open(src,'r')
360        d = open(dest, 'w')
361
362        buf = "x"
363        while buf != "":
364            buf = s.read(size)
365            d.write(buf)
366        s.close()
367        d.close()
368
369    # Call while holding self.state_lock
370    def write_state(self):
371        """
372        Write a new copy of experiment state after copying the existing state
373        to a backup.
374
375        State format is a simple pickling of the state dictionary.
376        """
377        if os.access(self.state_filename, os.W_OK):
378            self.copy_file(self.state_filename, \
379                    "%s.bak" % self.state_filename)
380        try:
381            f = open(self.state_filename, 'w')
382            pickle.dump(self.state, f)
383        except IOError, e:
384            self.log.error("Can't write file %s: %s" % \
385                    (self.state_filename, e))
386        except pickle.PicklingError, e:
387            self.log.error("Pickling problem: %s" % e)
388        except TypeError, e:
389            self.log.error("Pickling problem (TypeError): %s" % e)
390
391    # Call while holding self.state_lock
392    def read_state(self):
393        """
394        Read a new copy of experiment state.  Old state is overwritten.
395
396        State format is a simple pickling of the state dictionary.
397        """
398        try:
399            f = open(self.state_filename, "r")
400            self.state = pickle.load(f)
401            self.log.debug("[read_state]: Read state from %s" % \
402                    self.state_filename)
403        except IOError, e:
404            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
405                    % (self.state_filename, e))
406        except pickle.UnpicklingError, e:
407            self.log.warning(("[read_state]: No saved state: " + \
408                    "Unpickling failed: %s") % e)
409       
410        for k in self.state.keys():
411            try:
412                # This list should only have one element in it, but phrasing it
413                # as a for loop doesn't cost much, really.  We have to find the
414                # fedid elements anyway.
415                for eid in [ f['fedid'] \
416                        for f in self.state[k]['experimentID']\
417                            if f.has_key('fedid') ]:
418                    self.auth.set_attribute(self.state[k]['owner'], eid)
419                    # allow overrides to control experiments as well
420                    for o in self.overrides:
421                        self.auth.set_attribute(o, eid)
422            except KeyError, e:
423                self.log.warning("[read_state]: State ownership or identity " +\
424                        "misformatted in %s: %s" % (self.state_filename, e))
425
426
427    def read_accessdb(self, accessdb_file):
428        """
429        Read the mapping from fedids that can create experiments to their name
430        in the 3-level access namespace.  All will be asserted from this
431        testbed and can include the local username and porject that will be
432        asserted on their behalf by this fedd.  Each fedid is also added to the
433        authorization system with the "create" attribute.
434        """
435        self.accessdb = {}
436        # These are the regexps for parsing the db
437        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
438        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
439                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
440        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
441                "\s*->\s*(" + name_expr + ")\s*$")
442        lineno = 0
443
444        # Parse the mappings and store in self.authdb, a dict of
445        # fedid -> (proj, user)
446        try:
447            f = open(accessdb_file, "r")
448            for line in f:
449                lineno += 1
450                line = line.strip()
451                if len(line) == 0 or line.startswith('#'):
452                    continue
453                m = project_line.match(line)
454                if m:
455                    fid = fedid(hexstr=m.group(1))
456                    project, user = m.group(2,3)
457                    if not self.accessdb.has_key(fid):
458                        self.accessdb[fid] = []
459                    self.accessdb[fid].append((project, user))
460                    continue
461
462                m = user_line.match(line)
463                if m:
464                    fid = fedid(hexstr=m.group(1))
465                    project = None
466                    user = m.group(2)
467                    if not self.accessdb.has_key(fid):
468                        self.accessdb[fid] = []
469                    self.accessdb[fid].append((project, user))
470                    continue
471                self.log.warn("[experiment_control] Error parsing access " +\
472                        "db %s at line %d" %  (accessdb_file, lineno))
473        except IOError:
474            raise service_error(service_error.internal,
475                    "Error opening/reading %s as experiment " +\
476                            "control accessdb" %  accessdb_file)
477        f.close()
478
479        # Initialize the authorization attributes
480        for fid in self.accessdb.keys():
481            self.auth.set_attribute(fid, 'create')
482
483    def read_mapdb(self, file):
484        """
485        Read a simple colon separated list of mappings for the
486        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
487        """
488
489        self.tbmap = { }
490        lineno =0
491        try:
492            f = open(file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if line.startswith('#') or len(line) == 0:
497                    continue
498                try:
499                    label, url = line.split(':', 1)
500                    self.tbmap[label] = url
501                except ValueError, e:
502                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
503                            "map db: %s %s" % (lineno, line, e))
504        except IOError, e:
505            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
506                    "open %s: %s" % (file, e))
507        f.close()
508
509    class emulab_segment:
510        def __init__(self, log=None, keyfile=None, debug=False):
511            self.log = log or logging.getLogger(\
512                    'fedd.experiment_control.emulab_segment')
513            self.ssh_privkey_file = keyfile
514            self.debug = debug
515            self.ssh_exec="/usr/bin/ssh"
516            self.scp_exec = "/usr/bin/scp"
517            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
518
519        def scp_file(self, file, user, host, dest=""):
520            """
521            scp a file to the remote host.  If debug is set the action is only
522            logged.
523            """
524
525            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
526                    '-o', 'StrictHostKeyChecking yes', '-i', 
527                    self.ssh_privkey_file, file, 
528                    "%s@%s:%s" % (user, host, dest)]
529            rv = 0
530
531            try:
532                dnull = open("/dev/null", "w")
533            except IOError:
534                self.log.debug("[ssh_file]: failed to open " + \
535                        "/dev/null for redirect")
536                dnull = Null
537
538            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
539            if not self.debug:
540                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
541                        close_fds=True)
542
543            return rv == 0
544
545        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
546            """
547            Run a remote command on host as user.  If debug is set, the action
548            is only logged.  Commands are run without stdin, to avoid stray
549            SIGTTINs.
550            """
551            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
552                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
553                    (self.ssh_exec, self.ssh_privkey_file, 
554                            user, host, cmd)
555
556            try:
557                dnull = open("/dev/null", "r")
558            except IOError:
559                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
560                        "for redirect")
561                dnull = Null
562
563            self.log.debug("[ssh_cmd]: %s" % sh_str)
564            if not self.debug:
565                if dnull:
566                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
567                            close_fds=True)
568                else:
569                    sub = Popen(sh_str, shell=True,
570                            close_fds=True)
571                if timeout:
572                    i = 0
573                    rv = sub.poll()
574                    while i < timeout:
575                        if rv is not None: break
576                        else:
577                            time.sleep(1)
578                            rv = sub.poll()
579                            i += 1
580                    else:
581                        self.log.debug("Process exceeded runtime: %s" % sh_str)
582                        os.kill(sub.pid, signal.SIGKILL)
583                        raise self.ssh_cmd_timeout();
584                    return rv == 0
585                else:
586                    return sub.wait() == 0
587            else:
588                if timeout == 0:
589                    self.log.debug("debug timeout raised on %s " % sh_str)
590                    raise self.ssh_cmd_timeout()
591                else:
592                    return True
593
594    class start_segment(emulab_segment):
595        def __init__(self, log=None, keyfile=None, debug=False):
596            experiment_control_local.emulab_segment.__init__(self,
597                    log=log, keyfile=keyfile, debug=debug)
598
599        def create_config_tree(self, src_dir, dest_dir, script):
600            """
601            Append commands to script that will create the directory hierarchy
602            on the remote federant.
603            """
604
605            if os.path.isdir(src_dir):
606                print >>script, "mkdir -p %s" % dest_dir
607                print >>script, "chmod 770 %s" % dest_dir
608
609                for f in os.listdir(src_dir):
610                    if os.path.isdir(f):
611                        self.create_config_tree("%s/%s" % (src_dir, f), 
612                                "%s/%s" % (dest_dir, f), script)
613            else:
614                self.log.debug("[create_config_tree]: Not a directory: %s" \
615                        % src_dir)
616
617        def ship_configs(self, host, user, src_dir, dest_dir):
618            """
619            Copy federant-specific configuration files to the federant.
620            """
621            for f in os.listdir(src_dir):
622                if os.path.isdir(f):
623                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
624                            "%s/%s" % (dest_dir, f)):
625                        return False
626                else:
627                    if not self.scp_file("%s/%s" % (src_dir, f), 
628                            user, host, dest_dir):
629                        return False
630            return True
631
632        def get_state(self, user, host, tb, pid, eid):
633            # command to test experiment state
634            expinfo_exec = "/usr/testbed/bin/expinfo" 
635            # Regular expressions to parse the expinfo response
636            state_re = re.compile("State:\s+(\w+)")
637            no_exp_re = re.compile("^No\s+such\s+experiment")
638            swapping_re = re.compile("^No\s+information\s+available.")
639            state = None    # Experiment state parsed from expinfo
640            # The expinfo ssh command.  Note the identity restriction to use
641            # only the identity provided in the pubkey given.
642            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
643                    'StrictHostKeyChecking yes', '-i', 
644                    self.ssh_privkey_file, "%s@%s" % (user, host), 
645                    expinfo_exec, pid, eid]
646
647            dev_null = None
648            try:
649                dev_null = open("/dev/null", "a")
650            except IOError, e:
651                self.log.error("[get_state]: can't open /dev/null: %s" %e)
652
653            if self.debug:
654                state = 'swapped'
655                rv = 0
656            else:
657                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
658                        close_fds=True)
659                for line in status.stdout:
660                    m = state_re.match(line)
661                    if m: state = m.group(1)
662                    else:
663                        for reg, st in ((no_exp_re, "none"),
664                                (swapping_re, "swapping")):
665                            m = reg.match(line)
666                            if m: state = st
667                rv = status.wait()
668
669            # If the experiment is not present the subcommand returns a
670            # non-zero return value.  If we successfully parsed a "none"
671            # outcome, ignore the return code.
672            if rv != 0 and state != 'none':
673                raise service_error(service_error.internal,
674                        "Cannot get status of segment %s:%s/%s" % \
675                                (tb, pid, eid))
676            elif state not in ('active', 'swapped', 'swapping', 'none'):
677                raise service_error(service_error.internal,
678                        "Cannot get status of segment %s:%s/%s" % \
679                                (tb, pid, eid))
680            else: return state
681
682
683        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
684            """
685            Start a sub-experiment on a federant.
686
687            Get the current state, modify or create as appropriate, ship data
688            and configs and start the experiment.  There are small ordering
689            differences based on the initial state of the sub-experiment.
690            """
691            # ops node in the federant
692            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
693            user = tbparams[tb]['user']     # federant user
694            pid = tbparams[tb]['project']   # federant project
695            # XXX
696            base_confs = ( "hosts",)
697            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
698            # Configuration directories on the remote machine
699            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
700            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
701            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
702
703            state = self.get_state(user, host, tb, pid, eid)
704
705            self.log.debug("[start_segment]: %s: %s" % (tb, state))
706            self.log.info("[start_segment]:transferring experiment to %s" % tb)
707
708            if not self.scp_file("%s/%s/%s" % \
709                    (tmpdir, tb, tclfile), user, host):
710                return False
711           
712            if state == 'none':
713                # Create a null copy of the experiment so that we capture any
714                # logs there if the modify fails.  Emulab software discards the
715                # logs from a failed startexp
716                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
717                    return False
718                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
719                timedout = False
720                try:
721                    if not self.ssh_cmd(user, host,
722                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
723                            "-e %s null.tcl") % (pid, eid), "startexp",
724                            timeout=60 * 10):
725                        return False
726                except self.ssh_cmd_timeout:
727                    timedout = True
728
729                if timedout:
730                    state = self.get_state(user, host, tb, eid, pid)
731                    if state != "swapped":
732                        return False
733
734           
735            # Open up a temporary file to contain a script for setting up the
736            # filespace for the new experiment.
737            self.log.info("[start_segment]: creating script file")
738            try:
739                sf, scriptname = tempfile.mkstemp()
740                scriptfile = os.fdopen(sf, 'w')
741            except IOError:
742                return False
743
744            scriptbase = os.path.basename(scriptname)
745
746            # Script the filesystem changes
747            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
748            # Clear and create the tarfiles and rpm directories
749            for d in (tarfiles_dir, rpms_dir):
750                print >>scriptfile, "/bin/rm -rf %s/*" % d
751                print >>scriptfile, "mkdir -p %s" % d
752            print >>scriptfile, 'mkdir -p %s' % proj_dir
753            self.create_config_tree("%s/%s" % (tmpdir, tb),
754                    proj_dir, scriptfile)
755            if os.path.isdir("%s/tarfiles" % tmpdir):
756                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
757                        scriptfile)
758            if os.path.isdir("%s/rpms" % tmpdir):
759                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
760                        scriptfile)
761            print >>scriptfile, "rm -f %s" % scriptbase
762            scriptfile.close()
763
764            # Move the script to the remote machine
765            # XXX: could collide tempfile names on the remote host
766            if self.scp_file(scriptname, user, host, scriptbase):
767                os.remove(scriptname)
768            else:
769                return False
770
771            # Execute the script (and the script's last line deletes it)
772            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
773                return False
774
775            for f in base_confs:
776                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
777                        "%s/%s" % (proj_dir, f)):
778                    return False
779            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
780                    proj_dir):
781                return False
782            if os.path.isdir("%s/tarfiles" % tmpdir):
783                if not self.ship_configs(host, user,
784                        "%s/tarfiles" % tmpdir, tarfiles_dir):
785                    return False
786            if os.path.isdir("%s/rpms" % tmpdir):
787                if not self.ship_configs(host, user,
788                        "%s/rpms" % tmpdir, tarfiles_dir):
789                    return False
790            # Stage the new configuration (active experiments will stay swapped
791            # in now)
792            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
793            try:
794                if not self.ssh_cmd(user, host,
795                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
796                                (pid, eid, tclfile),
797                        "modexp", timeout= 60 * 10):
798                    return False
799            except self.ssh_cmd_timeout:
800                self.log.error("Modify command failed to complete in time")
801                # There's really no way to see if this succeeded or failed, so
802                # if it hangs, assume the worst.
803                return False
804            # Active experiments are still swapped, this swaps the others in.
805            if state != 'active':
806                self.log.info("[start_segment]: Swapping %s in on %s" % \
807                        (eid, tb))
808                timedout = False
809                try:
810                    if not self.ssh_cmd(user, host,
811                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
812                            "swapexp", timeout=10*60):
813                        return False
814                except self.ssh_cmd_timeout:
815                    timedout = True
816               
817                # If the command was terminated, but completed successfully,
818                # report success.
819                if timedout:
820                    self.log.debug("[start_segment]: swapin timed out " +\
821                            "checking state")
822                    state = self.get_state(user, host, tb, eid, pid)
823                    self.log.debug("[start_segment]: state is %s" % state)
824                    return state == 'active'
825            # Everything has gone OK.
826            return True
827
828    class stop_segment(emulab_segment):
829        def __init__(self, log=None, keyfile=None, debug=False):
830            experiment_control_local.emulab_segment.__init__(self,
831                    log=log, keyfile=keyfile, debug=debug)
832
833        def __call__(self, tb, eid, tbparams):
834            """
835            Stop a sub experiment by calling swapexp on the federant
836            """
837            user = tbparams[tb]['user']
838            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
839            pid = tbparams[tb]['project']
840
841            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
842            return self.ssh_cmd(user, host,
843                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
844
845       
846    def generate_ssh_keys(self, dest, type="rsa" ):
847        """
848        Generate a set of keys for the gateways to use to talk.
849
850        Keys are of type type and are stored in the required dest file.
851        """
852        valid_types = ("rsa", "dsa")
853        t = type.lower();
854        if t not in valid_types: raise ValueError
855        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
856
857        try:
858            trace = open("/dev/null", "w")
859        except IOError:
860            raise service_error(service_error.internal,
861                    "Cannot open /dev/null??");
862
863        # May raise CalledProcessError
864        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
865        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
866        if rv != 0:
867            raise service_error(service_error.internal, 
868                    "Cannot generate nonce ssh keys.  %s return code %d" \
869                            % (self.ssh_keygen, rv))
870
871    def gentopo(self, str):
872        """
873        Generate the topology dtat structure from the splitter's XML
874        representation of it.
875
876        The topology XML looks like:
877            <experiment>
878                <nodes>
879                    <node><vname></vname><ips>ip1:ip2</ips></node>
880                </nodes>
881                <lans>
882                    <lan>
883                        <vname></vname><vnode></vnode><ip></ip>
884                        <bandwidth></bandwidth><member>node:port</member>
885                    </lan>
886                </lans>
887        """
888        class topo_parse:
889            """
890            Parse the topology XML and create the dats structure.
891            """
892            def __init__(self):
893                # Typing of the subelements for data conversion
894                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
895                self.int_subelements = ( 'bandwidth',)
896                self.float_subelements = ( 'delay',)
897                # The final data structure
898                self.nodes = [ ]
899                self.lans =  [ ]
900                self.topo = { \
901                        'node': self.nodes,\
902                        'lan' : self.lans,\
903                    }
904                self.element = { }  # Current element being created
905                self.chars = ""     # Last text seen
906
907            def end_element(self, name):
908                # After each sub element the contents is added to the current
909                # element or to the appropriate list.
910                if name == 'node':
911                    self.nodes.append(self.element)
912                    self.element = { }
913                elif name == 'lan':
914                    self.lans.append(self.element)
915                    self.element = { }
916                elif name in self.str_subelements:
917                    self.element[name] = self.chars
918                    self.chars = ""
919                elif name in self.int_subelements:
920                    self.element[name] = int(self.chars)
921                    self.chars = ""
922                elif name in self.float_subelements:
923                    self.element[name] = float(self.chars)
924                    self.chars = ""
925
926            def found_chars(self, data):
927                self.chars += data.rstrip()
928
929
930        tp = topo_parse();
931        parser = xml.parsers.expat.ParserCreate()
932        parser.EndElementHandler = tp.end_element
933        parser.CharacterDataHandler = tp.found_chars
934
935        parser.Parse(str)
936
937        return tp.topo
938       
939
940    def genviz(self, topo):
941        """
942        Generate the visualization the virtual topology
943        """
944
945        neato = "/usr/local/bin/neato"
946        # These are used to parse neato output and to create the visualization
947        # file.
948        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
949        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
950                "%s</type></node>"
951
952        try:
953            # Node names
954            nodes = [ n['vname'] for n in topo['node'] ]
955            topo_lans = topo['lan']
956        except KeyError:
957            raise service_error(service_error.internal, "Bad topology")
958
959        lans = { }
960        links = { }
961
962        # Walk through the virtual topology, organizing the connections into
963        # 2-node connections (links) and more-than-2-node connections (lans).
964        # When a lan is created, it's added to the list of nodes (there's a
965        # node in the visualization for the lan).
966        for l in topo_lans:
967            if links.has_key(l['vname']):
968                if len(links[l['vname']]) < 2:
969                    links[l['vname']].append(l['vnode'])
970                else:
971                    nodes.append(l['vname'])
972                    lans[l['vname']] = links[l['vname']]
973                    del links[l['vname']]
974                    lans[l['vname']].append(l['vnode'])
975            elif lans.has_key(l['vname']):
976                lans[l['vname']].append(l['vnode'])
977            else:
978                links[l['vname']] = [ l['vnode'] ]
979
980
981        # Open up a temporary file for dot to turn into a visualization
982        try:
983            df, dotname = tempfile.mkstemp()
984            dotfile = os.fdopen(df, 'w')
985        except IOError:
986            raise service_error(service_error.internal,
987                    "Failed to open file in genviz")
988
989        # Generate a dot/neato input file from the links, nodes and lans
990        try:
991            print >>dotfile, "graph G {"
992            for n in nodes:
993                print >>dotfile, '\t"%s"' % n
994            for l in links.keys():
995                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
996            for l in lans.keys():
997                for n in lans[l]:
998                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
999            print >>dotfile, "}"
1000            dotfile.close()
1001        except TypeError:
1002            raise service_error(service_error.internal,
1003                    "Single endpoint link in vtopo")
1004        except IOError:
1005            raise service_error(service_error.internal, "Cannot write dot file")
1006
1007        # Use dot to create a visualization
1008        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1009                '-Gpack=true', dotname], stdout=PIPE, close_fds=True)
1010
1011        # Translate dot to vis format
1012        vis_nodes = [ ]
1013        vis = { 'node': vis_nodes }
1014        for line in dot.stdout:
1015            m = vis_re.match(line)
1016            if m:
1017                vn = m.group(1)
1018                vis_node = {'name': vn, \
1019                        'x': float(m.group(2)),\
1020                        'y' : float(m.group(3)),\
1021                    }
1022                if vn in links.keys() or vn in lans.keys():
1023                    vis_node['type'] = 'lan'
1024                else:
1025                    vis_node['type'] = 'node'
1026                vis_nodes.append(vis_node)
1027        rv = dot.wait()
1028
1029        os.remove(dotname)
1030        if rv == 0 : return vis
1031        else: return None
1032
1033    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1034            access_user):
1035        """
1036        Get access to testbed through fedd and set the parameters for that tb
1037        """
1038        uri = self.tbmap.get(tb, None)
1039        if not uri:
1040            raise service_error(serice_error.server_config, 
1041                    "Unknown testbed: %s" % tb)
1042
1043        # currently this lumps all users into one service access group
1044        service_keys = [ a for u in user \
1045                for a in u.get('access', []) \
1046                    if a.has_key('sshPubkey')]
1047
1048        if len(service_keys) == 0:
1049            raise service_error(service_error.req, 
1050                    "Must have at least one SSH pubkey for services")
1051
1052
1053        for p, u in access_user:
1054            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1055                    "to %s") %  ((p or "None"), u, uri))
1056
1057            if p:
1058                # Request with user and project specified
1059                req = {\
1060                        'destinationTestbed' : { 'uri' : uri },
1061                        'project': { 
1062                            'name': {'localname': p},
1063                            'user': [ {'userID': { 'localname': u } } ],
1064                            },
1065                        'user':  user,
1066                        'allocID' : { 'localname': 'test' },
1067                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1068                        'serviceAccess' : service_keys
1069                    }
1070            else:
1071                # Request with only user specified
1072                req = {\
1073                        'destinationTestbed' : { 'uri' : uri },
1074                        'user':  [ {'userID': { 'localname': u } } ],
1075                        'allocID' : { 'localname': 'test' },
1076                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1077                        'serviceAccess' : service_keys
1078                    }
1079
1080            if tb == master:
1081                # NB, the export_project parameter is a dict that includes
1082                # the type
1083                req['exportProject'] = export_project
1084
1085            # node resources if any
1086            if nodes != None and len(nodes) > 0:
1087                rnodes = [ ]
1088                for n in nodes:
1089                    rn = { }
1090                    image, hw, count = n.split(":")
1091                    if image: rn['image'] = [ image ]
1092                    if hw: rn['hardware'] = [ hw ]
1093                    if count and int(count) >0 : rn['count'] = int(count)
1094                    rnodes.append(rn)
1095                req['resources']= { }
1096                req['resources']['node'] = rnodes
1097
1098            try:
1099                if self.local_access.has_key(uri):
1100                    # Local access call
1101                    req = { 'RequestAccessRequestBody' : req }
1102                    r = self.local_access[uri].RequestAccess(req, 
1103                            fedid(file=self.cert_file))
1104                    r = { 'RequestAccessResponseBody' : r }
1105                else:
1106                    r = self.call_RequestAccess(uri, req, 
1107                            self.cert_file, self.cert_pwd, self.trusted_certs)
1108            except service_error, e:
1109                if e.code == service_error.access:
1110                    self.log.debug("[get_access] Access denied")
1111                    r = None
1112                    continue
1113                else:
1114                    raise e
1115
1116            if r.has_key('RequestAccessResponseBody'):
1117                # Through to here we have a valid response, not a fault.
1118                # Access denied is a fault, so something better or worse than
1119                # access denied has happened.
1120                r = r['RequestAccessResponseBody']
1121                self.log.debug("[get_access] Access granted")
1122                break
1123            else:
1124                raise service_error(service_error.protocol,
1125                        "Bad proxy response")
1126       
1127        if not r:
1128            raise service_error(service_error.access, 
1129                    "Access denied by %s (%s)" % (tb, uri))
1130
1131        e = r['emulab']
1132        p = e['project']
1133        tbparam[tb] = { 
1134                "boss": e['boss'],
1135                "host": e['ops'],
1136                "domain": e['domain'],
1137                "fs": e['fileServer'],
1138                "eventserver": e['eventServer'],
1139                "project": unpack_id(p['name']),
1140                "emulab" : e,
1141                "allocID" : r['allocID'],
1142                }
1143        # Make the testbed name be the label the user applied
1144        p['testbed'] = {'localname': tb }
1145
1146        for u in p['user']:
1147            role = u.get('role', None)
1148            if role == 'experimentCreation':
1149                tbparam[tb]['user'] = unpack_id(u['userID'])
1150                break
1151        else:
1152            raise service_error(service_error.internal, 
1153                    "No createExperimentUser from %s" %tb)
1154
1155        # Add attributes to barameter space.  We don't allow attributes to
1156        # overlay any parameters already installed.
1157        for a in e['fedAttr']:
1158            try:
1159                if a['attribute'] and isinstance(a['attribute'], basestring)\
1160                        and not tbparam[tb].has_key(a['attribute'].lower()):
1161                    tbparam[tb][a['attribute'].lower()] = a['value']
1162            except KeyError:
1163                self.log.error("Bad attribute in response: %s" % a)
1164       
1165    def release_access(self, tb, aid):
1166        """
1167        Release access to testbed through fedd
1168        """
1169
1170        uri = self.tbmap.get(tb, None)
1171        if not uri:
1172            raise service_error(serice_error.server_config, 
1173                    "Unknown testbed: %s" % tb)
1174
1175        if self.local_access.has_key(uri):
1176            resp = self.local_access[uri].ReleaseAccess(\
1177                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1178                    fedid(file=self.cert_file))
1179            resp = { 'ReleaseAccessResponseBody': resp } 
1180        else:
1181            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1182                    self.cert_file, self.cert_pwd, self.trusted_certs)
1183
1184        # better error coding
1185
1186    def remote_splitter(self, uri, desc, master):
1187
1188        req = {
1189                'description' : { 'ns2description': desc },
1190                'master': master,
1191                'include_fedkit': bool(self.fedkit),
1192                'include_gatewaykit': bool(self.gatewaykit)
1193            }
1194
1195        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1196                self.trusted_certs)
1197
1198        if r.has_key('Ns2SplitResponseBody'):
1199            r = r['Ns2SplitResponseBody']
1200            if r.has_key('output'):
1201                return r['output'].splitlines()
1202            else:
1203                raise service_error(service_error.protocol, 
1204                        "Bad splitter response (no output)")
1205        else:
1206            raise service_error(service_error.protocol, "Bad splitter response")
1207       
1208    class current_testbed:
1209        """
1210        Object for collecting the current testbed description.  The testbed
1211        description is saved to a file with the local testbed variables
1212        subsittuted line by line.
1213        """
1214        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1215            def tar_list_to_string(tl):
1216                if tl is None: return None
1217
1218                rv = ""
1219                for t in tl:
1220                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1221                            (t[0], os.path.basename(t[1]))
1222                return rv
1223
1224
1225            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1226            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1227            self.current_testbed = None
1228            self.testbed_file = None
1229
1230            self.def_expstart = \
1231                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1232            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1233            self.def_gwstart = \
1234                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1235            self.def_mgwstart = \
1236                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1237            self.def_gwimage = "FBSD61-TUNNEL2";
1238            self.def_gwtype = "pc";
1239            self.def_mgwcmd = '# '
1240            self.def_mgwcmdparams = ''
1241            self.def_gwcmd = '# '
1242            self.def_gwcmdparams = ''
1243
1244            self.eid = eid
1245            self.tmpdir = tmpdir
1246            # Convert fedkit and gateway kit (which are lists of tuples) into a
1247            # substituition string.
1248            self.fedkit = tar_list_to_string(fedkit)
1249            self.gatewaykit = tar_list_to_string(gatewaykit)
1250
1251        def __call__(self, line, master, allocated, tbparams):
1252            # Capture testbed topology descriptions
1253            if self.current_testbed == None:
1254                m = self.begin_testbed.match(line)
1255                if m != None:
1256                    self.current_testbed = m.group(1)
1257                    if self.current_testbed == None:
1258                        raise service_error(service_error.req,
1259                                "Bad request format (unnamed testbed)")
1260                    allocated[self.current_testbed] = \
1261                            allocated.get(self.current_testbed,0) + 1
1262                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1263                    if not os.path.exists(tb_dir):
1264                        try:
1265                            os.mkdir(tb_dir)
1266                        except IOError:
1267                            raise service_error(service_error.internal,
1268                                    "Cannot create %s" % tb_dir)
1269                    try:
1270                        self.testbed_file = open("%s/%s.%s.tcl" %
1271                                (tb_dir, self.eid, self.current_testbed), 'w')
1272                    except IOError:
1273                        self.testbed_file = None
1274                    return True
1275                else: return False
1276            else:
1277                m = self.end_testbed.match(line)
1278                if m != None:
1279                    if m.group(1) != self.current_testbed:
1280                        raise service_error(service_error.internal, 
1281                                "Mismatched testbed markers!?")
1282                    if self.testbed_file != None: 
1283                        self.testbed_file.close()
1284                        self.testbed_file = None
1285                    self.current_testbed = None
1286                elif self.testbed_file:
1287                    # Substitute variables and put the line into the local
1288                    # testbed file.
1289                    gwtype = tbparams[self.current_testbed].get(\
1290                            'connectortype', self.def_gwtype)
1291                    gwimage = tbparams[self.current_testbed].get(\
1292                            'connectorimage', self.def_gwimage)
1293                    mgwstart = tbparams[self.current_testbed].get(\
1294                            'masterconnectorstartcmd', self.def_mgwstart)
1295                    mexpstart = tbparams[self.current_testbed].get(\
1296                            'masternodestartcmd', self.def_mexpstart)
1297                    gwstart = tbparams[self.current_testbed].get(\
1298                            'slaveconnectorstartcmd', self.def_gwstart)
1299                    expstart = tbparams[self.current_testbed].get(\
1300                            'slavenodestartcmd', self.def_expstart)
1301                    project = tbparams[self.current_testbed].get('project')
1302                    gwcmd = tbparams[self.current_testbed].get(\
1303                            'slaveconnectorcmd', self.def_gwcmd)
1304                    gwcmdparams = tbparams[self.current_testbed].get(\
1305                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1306                    mgwcmd = tbparams[self.current_testbed].get(\
1307                            'masterconnectorcmd', self.def_gwcmd)
1308                    mgwcmdparams = tbparams[self.current_testbed].get(\
1309                            'masterconnectorcmdparams', self.def_gwcmdparams)
1310                    line = re.sub("GWTYPE", gwtype, line)
1311                    line = re.sub("GWIMAGE", gwimage, line)
1312                    if self.current_testbed == master:
1313                        line = re.sub("GWSTART", mgwstart, line)
1314                        line = re.sub("EXPSTART", mexpstart, line)
1315                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1316                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1317                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1318                    else:
1319                        line = re.sub("GWSTART", gwstart, line)
1320                        line = re.sub("EXPSTART", expstart, line)
1321                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1322                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1323                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1324                    #These expansions contain EID and PROJDIR.  NB these are
1325                    # local fedkit and gatewaykit, which are strings.
1326                    if self.fedkit:
1327                        line = re.sub("FEDKIT", self.fedkit, line)
1328                    if self.gatewaykit:
1329                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1330                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1331                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1332                    line = re.sub("EID", self.eid, line)
1333                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1334                            (project, self.eid), line)
1335                    print >>self.testbed_file, line
1336                return True
1337
1338    class allbeds:
1339        """
1340        Process the Allbeds section.  Get access to each federant and save the
1341        parameters in tbparams
1342        """
1343        def __init__(self, get_access):
1344            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1345            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1346            self.in_allbeds = False
1347            self.get_access = get_access
1348
1349        def __call__(self, line, user, tbparams, master, export_project,
1350                access_user):
1351            # Testbed access parameters
1352            if not self.in_allbeds:
1353                if self.begin_allbeds.match(line):
1354                    self.in_allbeds = True
1355                    return True
1356                else:
1357                    return False
1358            else:
1359                if self.end_allbeds.match(line):
1360                    self.in_allbeds = False
1361                else:
1362                    nodes = line.split('|')
1363                    tb = nodes.pop(0)
1364                    self.get_access(tb, nodes, user, tbparams, master,
1365                            export_project, access_user)
1366                return True
1367
1368    class gateways:
1369        def __init__(self, eid, master, tmpdir, gw_pubkey,
1370                gw_secretkey, copy_file, fedkit):
1371            self.begin_gateways = \
1372                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1373            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1374            self.current_gateways = None
1375            self.control_gateway = None
1376            self.active_end = { }
1377
1378            self.eid = eid
1379            self.master = master
1380            self.tmpdir = tmpdir
1381            self.gw_pubkey_base = gw_pubkey
1382            self.gw_secretkey_base = gw_secretkey
1383
1384            self.copy_file = copy_file
1385            self.fedkit = fedkit
1386
1387
1388        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1389                active_end, tbparams, dtb, myname, desthost, type):
1390            """
1391            Produce a gateway configuration file from a gateways line.
1392            """
1393
1394            sproject = tbparams[gw].get('project', 'project')
1395            dproject = tbparams[dtb].get('project', 'project')
1396            sdomain = ".%s.%s%s" % (eid, sproject,
1397                    tbparams[gw].get('domain', ".example.com"))
1398            ddomain = ".%s.%s%s" % (eid, dproject,
1399                    tbparams[dtb].get('domain', ".example.com"))
1400            boss = tbparams[master].get('boss', "boss")
1401            fs = tbparams[master].get('fs', "fs")
1402            event_server = "%s%s" % \
1403                    (tbparams[gw].get('eventserver', "event_server"),
1404                            tbparams[gw].get('domain', "example.com"))
1405            remote_event_server = "%s%s" % \
1406                    (tbparams[dtb].get('eventserver', "event_server"),
1407                            tbparams[dtb].get('domain', "example.com"))
1408            seer_control = "%s%s" % \
1409                    (tbparams[gw].get('control', "control"), sdomain)
1410            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1411
1412            if self.fedkit:
1413                remote_script_dir = "/usr/local/federation/bin"
1414                local_script_dir = "/usr/local/federation/bin"
1415            else:
1416                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1417                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1418
1419            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1420            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1421            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1422
1423            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1424            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1425
1426            # translate to lower case so the `hostname` hack for specifying
1427            # configuration files works.
1428            conf_file = conf_file.lower();
1429            remote_conf_file = remote_conf_file.lower();
1430
1431            if dtb == master:
1432                active = "false"
1433            elif gw == master:
1434                active = "true"
1435            elif active_end.has_key('%s-%s' % (dtb, gw)):
1436                active = "false"
1437            else:
1438                active_end['%s-%s' % (gw, dtb)] = 1
1439                active = "true"
1440
1441            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1442            print >>gwconfig, "Active: %s" % active
1443            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1444            if tunnel_iface:
1445                print >>gwconfig, "Interface: %s" % tunnel_iface
1446            print >>gwconfig, "BossName: %s" % boss
1447            print >>gwconfig, "FsName: %s" % fs
1448            print >>gwconfig, "EventServerName: %s" % event_server
1449            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1450            print >>gwconfig, "SeerControl: %s" % seer_control
1451            print >>gwconfig, "Type: %s" % type
1452            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1453            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1454                    local_script_dir
1455            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1456            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1457            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1458                    (remote_conf_dir, remote_conf_file)
1459            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1460            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1461            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1462            gwconfig.close()
1463
1464            return active == "true"
1465
1466        def __call__(self, line, allocated, tbparams):
1467            # Process gateways
1468            if not self.current_gateways:
1469                m = self.begin_gateways.match(line)
1470                if m:
1471                    self.current_gateways = m.group(1)
1472                    if allocated.has_key(self.current_gateways):
1473                        # This test should always succeed
1474                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1475                        if not os.path.exists(tb_dir):
1476                            try:
1477                                os.mkdir(tb_dir)
1478                            except IOError:
1479                                raise service_error(service_error.internal,
1480                                        "Cannot create %s" % tb_dir)
1481                    else:
1482                        # XXX
1483                        self.log.error("[gateways]: Ignoring gateways for " + \
1484                                "unknown testbed %s" % self.current_gateways)
1485                        self.current_gateways = None
1486                    return True
1487                else:
1488                    return False
1489            else:
1490                m = self.end_gateways.match(line)
1491                if m :
1492                    if m.group(1) != self.current_gateways:
1493                        raise service_error(service_error.internal,
1494                                "Mismatched gateway markers!?")
1495                    if self.control_gateway:
1496                        try:
1497                            cc = open("%s/%s/client.conf" %
1498                                    (self.tmpdir, self.current_gateways), 'w')
1499                            print >>cc, "ControlGateway: %s" % \
1500                                    self.control_gateway
1501                            if tbparams[self.master].has_key('smbshare'):
1502                                print >>cc, "SMBSHare: %s" % \
1503                                        tbparams[self.master]['smbshare']
1504                            print >>cc, "ProjectUser: %s" % \
1505                                    tbparams[self.master]['user']
1506                            print >>cc, "ProjectName: %s" % \
1507                                    tbparams[self.master]['project']
1508                            print >>cc, "ExperimentID: %s/%s" % \
1509                                    ( tbparams[self.master]['project'], \
1510                                    self.eid )
1511                            cc.close()
1512                        except IOError:
1513                            raise service_error(service_error.internal,
1514                                    "Error creating client config")
1515                        # XXX: This seer specific file should disappear
1516                        try:
1517                            cc = open("%s/%s/seer.conf" %
1518                                    (self.tmpdir, self.current_gateways),
1519                                    'w')
1520                            if self.current_gateways != self.master:
1521                                print >>cc, "ControlNode: %s" % \
1522                                        self.control_gateway
1523                            print >>cc, "ExperimentID: %s/%s" % \
1524                                    ( tbparams[self.master]['project'], \
1525                                    self.eid )
1526                            cc.close()
1527                        except IOError:
1528                            raise service_error(service_error.internal,
1529                                    "Error creating seer config")
1530                    else:
1531                        debug.error("[gateways]: No control gateway for %s" %\
1532                                    self.current_gateways)
1533                    self.current_gateways = None
1534                else:
1535                    dtb, myname, desthost, type = line.split(" ")
1536
1537                    if type == "control" or type == "both":
1538                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1539                                self.eid, 
1540                                tbparams[self.current_gateways]['project'],
1541                                tbparams[self.current_gateways]['domain'])
1542                    try:
1543                        active = self.gateway_conf_file(self.current_gateways,
1544                                self.master, self.eid, self.gw_pubkey_base,
1545                                self.gw_secretkey_base,
1546                                self.active_end, tbparams, dtb, myname,
1547                                desthost, type)
1548                    except IOError, e:
1549                        raise service_error(service_error.internal,
1550                                "Failed to write config file for %s" % \
1551                                        self.current_gateway)
1552           
1553                    gw_pubkey = "%s/keys/%s" % \
1554                            (self.tmpdir, self.gw_pubkey_base)
1555                    gw_secretkey = "%s/keys/%s" % \
1556                            (self.tmpdir, self.gw_secretkey_base)
1557
1558                    pkfile = "%s/%s/%s" % \
1559                            ( self.tmpdir, self.current_gateways, 
1560                                    self.gw_pubkey_base)
1561                    skfile = "%s/%s/%s" % \
1562                            ( self.tmpdir, self.current_gateways, 
1563                                    self.gw_secretkey_base)
1564
1565                    if not os.path.exists(pkfile):
1566                        try:
1567                            self.copy_file(gw_pubkey, pkfile)
1568                        except IOError:
1569                            service_error(service_error.internal,
1570                                    "Failed to copy pubkey file")
1571
1572                    if active and not os.path.exists(skfile):
1573                        try:
1574                            self.copy_file(gw_secretkey, skfile)
1575                        except IOError:
1576                            service_error(service_error.internal,
1577                                    "Failed to copy secretkey file")
1578                return True
1579
1580    class shunt_to_file:
1581        """
1582        Simple class to write data between two regexps to a file.
1583        """
1584        def __init__(self, begin, end, filename):
1585            """
1586            Begin shunting on a match of begin, stop on end, send data to
1587            filename.
1588            """
1589            self.begin = re.compile(begin)
1590            self.end = re.compile(end)
1591            self.in_shunt = False
1592            self.file = None
1593            self.filename = filename
1594
1595        def __call__(self, line):
1596            """
1597            Call this on each line in the input that may be shunted.
1598            """
1599            if not self.in_shunt:
1600                if self.begin.match(line):
1601                    self.in_shunt = True
1602                    try:
1603                        self.file = open(self.filename, "w")
1604                    except:
1605                        self.file = None
1606                        raise
1607                    return True
1608                else:
1609                    return False
1610            else:
1611                if self.end.match(line):
1612                    if self.file: 
1613                        self.file.close()
1614                        self.file = None
1615                    self.in_shunt = False
1616                else:
1617                    if self.file:
1618                        print >>self.file, line
1619                return True
1620
1621    class shunt_to_list:
1622        """
1623        Same interface as shunt_to_file.  Data collected in self.list, one list
1624        element per line.
1625        """
1626        def __init__(self, begin, end):
1627            self.begin = re.compile(begin)
1628            self.end = re.compile(end)
1629            self.in_shunt = False
1630            self.list = [ ]
1631       
1632        def __call__(self, line):
1633            if not self.in_shunt:
1634                if self.begin.match(line):
1635                    self.in_shunt = True
1636                    return True
1637                else:
1638                    return False
1639            else:
1640                if self.end.match(line):
1641                    self.in_shunt = False
1642                else:
1643                    self.list.append(line)
1644                return True
1645
1646    class shunt_to_string:
1647        """
1648        Same interface as shunt_to_file.  Data collected in self.str, all in
1649        one string.
1650        """
1651        def __init__(self, begin, end):
1652            self.begin = re.compile(begin)
1653            self.end = re.compile(end)
1654            self.in_shunt = False
1655            self.str = ""
1656       
1657        def __call__(self, line):
1658            if not self.in_shunt:
1659                if self.begin.match(line):
1660                    self.in_shunt = True
1661                    return True
1662                else:
1663                    return False
1664            else:
1665                if self.end.match(line):
1666                    self.in_shunt = False
1667                else:
1668                    self.str += line
1669                return True
1670
1671    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1672            tbparams, tmpdir, alloc_log=None):
1673        started = { }           # Testbeds where a sub-experiment started
1674                                # successfully
1675
1676        # XXX
1677        fail_soft = False
1678
1679        log = alloc_log or self.log
1680
1681        thread_pool = self.thread_pool(self.nthreads)
1682        threads = [ ]
1683
1684        for tb in [ k for k in allocated.keys() if k != master]:
1685            # Create and start a thread to start the segment, and save it to
1686            # get the return value later
1687            thread_pool.wait_for_slot()
1688            t  = self.pooled_thread(\
1689                    target=self.start_segment(log=log,
1690                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1691                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1692                    pdata=thread_pool, trace_file=self.trace_file)
1693            threads.append(t)
1694            t.start()
1695
1696        # Wait until all finish
1697        thread_pool.wait_for_all_done()
1698
1699        # If none failed, start the master
1700        failed = [ t.getName() for t in threads if not t.rv ]
1701
1702        if len(failed) == 0:
1703            starter = self.start_segment(log=log, 
1704                    keyfile=self.ssh_privkey_file, debug=self.debug)
1705            if not starter(master, eid, tbparams, tmpdir):
1706                failed.append(master)
1707
1708        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1709        # If one failed clean up, unless fail_soft is set
1710        if failed:
1711            if not fail_soft:
1712                thread_pool.clear()
1713                for tb in succeeded:
1714                    # Create and start a thread to stop the segment
1715                    thread_pool.wait_for_slot()
1716                    t  = self.pooled_thread(\
1717                            target=self.stop_segment(log=log,
1718                                keyfile=self.ssh_privkey_file,
1719                                debug=self.debug), 
1720                            args=(tb, eid, tbparams), name=tb,
1721                            pdata=thread_pool, trace_file=self.trace_file)
1722                    t.start()
1723                # Wait until all finish
1724                thread_pool.wait_for_all_done()
1725
1726                # release the allocations
1727                for tb in tbparams.keys():
1728                    self.release_access(tb, tbparams[tb]['allocID'])
1729                # Remove the placeholder
1730                self.state_lock.acquire()
1731                self.state[eid]['experimentStatus'] = 'failed'
1732                if self.state_filename: self.write_state()
1733                self.state_lock.release()
1734
1735                #raise service_error(service_error.federant,
1736                #    "Swap in failed on %s" % ",".join(failed))
1737                log.error("Swap in failed on %s" % ",".join(failed))
1738                return
1739        else:
1740            log.info("[start_segment]: Experiment %s active" % eid)
1741
1742        log.debug("[start_experiment]: removing %s" % tmpdir)
1743
1744        # Walk up tmpdir, deleting as we go
1745        for path, dirs, files in os.walk(tmpdir, topdown=False):
1746            for f in files:
1747                os.remove(os.path.join(path, f))
1748            for d in dirs:
1749                os.rmdir(os.path.join(path, d))
1750        os.rmdir(tmpdir)
1751
1752        # Insert the experiment into our state and update the disk copy
1753        self.state_lock.acquire()
1754        self.state[expid]['experimentStatus'] = 'active'
1755        self.state[eid] = self.state[expid]
1756        if self.state_filename: self.write_state()
1757        self.state_lock.release()
1758        return
1759
1760    def create_experiment(self, req, fid):
1761        """
1762        The external interface to experiment creation called from the
1763        dispatcher.
1764
1765        Creates a working directory, splits the incoming description using the
1766        splitter script and parses out the avrious subsections using the
1767        lcasses above.  Once each sub-experiment is created, use pooled threads
1768        to instantiate them and start it all up.
1769        """
1770
1771        if not self.auth.check_attribute(fid, 'create'):
1772            raise service_error(service_error.access, "Create access denied")
1773
1774        try:
1775            tmpdir = tempfile.mkdtemp(prefix="split-")
1776        except IOError:
1777            raise service_error(service_error.internal, "Cannot create tmp dir")
1778
1779        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1780        gw_secretkey_base = "fed.%s" % self.ssh_type
1781        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1782        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1783        tclfile = tmpdir + "/experiment.tcl"
1784        tbparams = { }
1785        try:
1786            access_user = self.accessdb[fid]
1787        except KeyError:
1788            raise service_error(service_error.internal,
1789                    "Access map and authorizer out of sync in " + \
1790                            "create_experiment for fedid %s"  % fid)
1791
1792        pid = "dummy"
1793        gid = "dummy"
1794        try:
1795            os.mkdir(tmpdir+"/keys")
1796        except OSError:
1797            raise service_error(service_error.internal,
1798                    "Can't make temporary dir")
1799
1800        req = req.get('CreateRequestBody', None)
1801        if not req:
1802            raise service_error(service_error.req,
1803                    "Bad request format (no CreateRequestBody)")
1804        # The tcl parser needs to read a file so put the content into that file
1805        descr=req.get('experimentdescription', None)
1806        if descr:
1807            file_content=descr.get('ns2description', None)
1808            if file_content:
1809                try:
1810                    f = open(tclfile, 'w')
1811                    f.write(file_content)
1812                    f.close()
1813                except IOError:
1814                    raise service_error(service_error.internal,
1815                            "Cannot write temp experiment description")
1816            else:
1817                raise service_error(service_error.req, 
1818                        "Only ns2descriptions supported")
1819        else:
1820            raise service_error(service_error.req, "No experiment description")
1821
1822        # Generate an ID for the experiment (slice) and a certificate that the
1823        # allocator can use to prove they own it.  We'll ship it back through
1824        # the encrypted connection.
1825        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1826
1827        if req.has_key('experimentID') and \
1828                req['experimentID'].has_key('localname'):
1829            eid = req['experimentID']['localname']
1830            self.state_lock.acquire()
1831            while (self.state.has_key(eid)):
1832                eid += random.choice(string.ascii_letters)
1833            # Initial state
1834            self.state[eid] = {
1835                    'experimentID' : \
1836                            [ { 'localname' : eid }, {'fedid': expid } ],
1837                    'experimentStatus': 'starting',
1838                    'experimentAccess': { 'X509' : expcert },
1839                    'owner': fid,
1840                    'log' : [],
1841                }
1842            self.state[expid] = self.state[eid]
1843            if self.state_filename: self.write_state()
1844            self.state_lock.release()
1845        else:
1846            eid = self.exp_stem
1847            for i in range(0,5):
1848                eid += random.choice(string.ascii_letters)
1849            self.state_lock.acquire()
1850            while (self.state.has_key(eid)):
1851                eid = self.exp_stem
1852                for i in range(0,5):
1853                    eid += random.choice(string.ascii_letters)
1854            # Initial state
1855            self.state[eid] = {
1856                    'experimentID' : \
1857                            [ { 'localname' : eid }, {'fedid': expid } ],
1858                    'experimentStatus': 'starting',
1859                    'experimentAccess': { 'X509' : expcert },
1860                    'owner': fid,
1861                    'log' : [],
1862                }
1863            self.state[expid] = self.state[eid]
1864            if self.state_filename: self.write_state()
1865            self.state_lock.release()
1866
1867        try: 
1868            # This catches exceptions to clear the placeholder if necessary
1869            try:
1870                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1871            except ValueError:
1872                raise service_error(service_error.server_config, 
1873                        "Bad key type (%s)" % self.ssh_type)
1874
1875            user = req.get('user', None)
1876            if user == None:
1877                raise service_error(service_error.req, "No user")
1878
1879            master = req.get('master', None)
1880            if not master:
1881                raise service_error(service_error.req,
1882                        "No master testbed label")
1883            export_project = req.get('exportProject', None)
1884            if not export_project:
1885                raise service_error(service_error.req, "No export project")
1886           
1887            if self.splitter_url:
1888                self.log.debug("Calling remote splitter at %s" % \
1889                        self.splitter_url)
1890                split_data = self.remote_splitter(self.splitter_url,
1891                        file_content, master)
1892            else:
1893                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1894                    str(self.muxmax), '-m', master]
1895
1896                if self.fedkit:
1897                    tclcmd.append('-k')
1898
1899                if self.gatewaykit:
1900                    tclcmd.append('-K')
1901
1902                tclcmd.extend([pid, gid, eid, tclfile])
1903
1904                self.log.debug("running local splitter %s", " ".join(tclcmd))
1905                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True)
1906                split_data = tclparser.stdout
1907
1908            allocated = { }         # Testbeds we can access
1909            # Objects to parse the splitter output (defined above)
1910            parse_current_testbed = self.current_testbed(eid, tmpdir,
1911                    self.fedkit, self.gatewaykit)
1912            parse_allbeds = self.allbeds(self.get_access)
1913            parse_gateways = self.gateways(eid, master, tmpdir,
1914                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1915                    self.fedkit)
1916            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1917                        "^#\s+End\s+Vtopo")
1918            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1919                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1920            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1921                    "^#\s+End\s+tarfiles")
1922            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1923                    "^#\s+End\s+rpms")
1924
1925            # Working on the split data
1926            for line in split_data:
1927                line = line.rstrip()
1928                if parse_current_testbed(line, master, allocated, tbparams):
1929                    continue
1930                elif parse_allbeds(line, user, tbparams, master, export_project,
1931                        access_user):
1932                    continue
1933                elif parse_gateways(line, allocated, tbparams):
1934                    continue
1935                elif parse_vtopo(line):
1936                    continue
1937                elif parse_hostnames(line):
1938                    continue
1939                elif parse_tarfiles(line):
1940                    continue
1941                elif parse_rpms(line):
1942                    continue
1943                else:
1944                    raise service_error(service_error.internal, 
1945                            "Bad tcl parse? %s" % line)
1946            # Virtual topology and visualization
1947            vtopo = self.gentopo(parse_vtopo.str)
1948            if not vtopo:
1949                raise service_error(service_error.internal, 
1950                        "Failed to generate virtual topology")
1951
1952            vis = self.genviz(vtopo)
1953            if not vis:
1954                raise service_error(service_error.internal, 
1955                        "Failed to generate visualization")
1956
1957           
1958            # save federant information
1959            for k in allocated.keys():
1960                tbparams[k]['federant'] = {\
1961                        'name': [ { 'localname' : eid} ],\
1962                        'emulab': tbparams[k]['emulab'],\
1963                        'allocID' : tbparams[k]['allocID'],\
1964                        'master' : k == master,\
1965                    }
1966
1967            self.state_lock.acquire()
1968            self.state[eid]['vtopo'] = vtopo
1969            self.state[eid]['vis'] = vis
1970            self.state[expid]['federant'] = \
1971                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1972                        if tbparams[tb].has_key('federant') ]
1973            if self.state_filename: self.write_state()
1974            self.state_lock.release()
1975
1976            # Copy tarfiles and rpms needed at remote sites into a staging area
1977            try:
1978                if self.fedkit:
1979                    for t in self.fedkit:
1980                        parse_tarfiles.list.append(t[1])
1981                if self.gatewaykit:
1982                    for t in self.gatewaykit:
1983                        parse_tarfiles.list.append(t[1])
1984                for t in parse_tarfiles.list:
1985                    if not os.path.exists("%s/tarfiles" % tmpdir):
1986                        os.mkdir("%s/tarfiles" % tmpdir)
1987                    self.copy_file(t, "%s/tarfiles/%s" % \
1988                            (tmpdir, os.path.basename(t)))
1989                for r in parse_rpms.list:
1990                    if not os.path.exists("%s/rpms" % tmpdir):
1991                        os.mkdir("%s/rpms" % tmpdir)
1992                    self.copy_file(r, "%s/rpms/%s" % \
1993                            (tmpdir, os.path.basename(r)))
1994                # A null experiment file in case we need to create a remote
1995                # experiment from scratch
1996                f = open("%s/null.tcl" % tmpdir, "w")
1997                print >>f, """
1998set ns [new Simulator]
1999source tb_compat.tcl
2000
2001set a [$ns node]
2002
2003$ns rtproto Session
2004$ns run
2005"""
2006                f.close()
2007
2008            except IOError, e:
2009                raise service_error(service_error.internal, 
2010                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2011
2012        except service_error, e:
2013            # If something goes wrong in the parse (usually an access error)
2014            # clear the placeholder state.  From here on out the code delays
2015            # exceptions.  Failing at this point returns a fault to the remote
2016            # caller.
2017            self.state_lock.acquire()
2018            del self.state[eid]
2019            del self.state[expid]
2020            if self.state_filename: self.write_state()
2021            self.state_lock.release()
2022            raise e
2023
2024
2025        # Start the background swapper and return the starting state.  From
2026        # here on out, the state will stick around a while.
2027
2028        # Let users touch the state
2029        self.auth.set_attribute(fid, expid)
2030        self.auth.set_attribute(expid, expid)
2031        # Override fedids can manipulate state as well
2032        for o in self.overrides:
2033            self.auth.set_attribute(o, expid)
2034
2035        # Create a logger that logs to the experiment's state object as well as
2036        # to the main log file.
2037
2038        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2039        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2040        # XXX: there should be a global one of these rather than repeating the
2041        # code.
2042        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2043                    '%d %b %y %H:%M:%S'))
2044        alloc_log.addHandler(h)
2045       
2046
2047
2048
2049
2050        # Start a thread to do the resource allocation
2051        t  = Thread(target=self.allocate_resources,
2052                args=(allocated, master, eid, expid, expcert, tbparams, 
2053                    tmpdir, alloc_log),
2054                name=eid)
2055        t.start()
2056
2057        rv = {
2058                'experimentID': [
2059                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2060                ],
2061                'experimentStatus': 'starting',
2062                'experimentAccess': { 'X509' : expcert }
2063            }
2064
2065        return rv
2066
2067    def check_experiment_access(self, fid, key):
2068        """
2069        Confirm that the fid has access to the experiment.  Though a request
2070        may be made in terms of a local name, the access attribute is always
2071        the experiment's fedid.
2072        """
2073        if not isinstance(key, fedid):
2074            self.state_lock.acquire()
2075            if self.state.has_key(key):
2076                if isinstance(self.state[key], dict):
2077                    try:
2078                        kl = [ f['fedid'] for f in \
2079                                self.state[key]['experimentID']\
2080                                    if f.has_key('fedid') ]
2081                    except KeyError:
2082                        self.state_lock.release()
2083                        raise service_error(service_error.internal, 
2084                                "No fedid for experiment %s when checking " +\
2085                                        "access(!?)" % key)
2086                    if len(kl) == 1:
2087                        key = kl[0]
2088                    else:
2089                        self.state_lock.release()
2090                        raise service_error(service_error.internal, 
2091                                "multiple fedids for experiment %s when " +\
2092                                        "checking access(!?)" % key)
2093                elif isinstance(self.state[key], str):
2094                    self.state_lock.release()
2095                    raise service_error(service_error.internal, 
2096                            ("experiment %s is placeholder.  " +\
2097                                    "Creation in progress or aborted oddly") \
2098                                    % key)
2099                else:
2100                    self.state_lock.release()
2101                    raise service_error(service_error.internal, 
2102                            "Unexpected state for %s" % key)
2103
2104            else:
2105                self.state_lock.release()
2106                raise service_error(service_error.access, "Access Denied")
2107            self.state_lock.release()
2108
2109        if self.auth.check_attribute(fid, key):
2110            return True
2111        else:
2112            raise service_error(service_error.access, "Access Denied")
2113
2114
2115
2116    def get_vtopo(self, req, fid):
2117        """
2118        Return the stored virtual topology for this experiment
2119        """
2120        rv = None
2121        state = None
2122
2123        req = req.get('VtopoRequestBody', None)
2124        if not req:
2125            raise service_error(service_error.req,
2126                    "Bad request format (no VtopoRequestBody)")
2127        exp = req.get('experiment', None)
2128        if exp:
2129            if exp.has_key('fedid'):
2130                key = exp['fedid']
2131                keytype = "fedid"
2132            elif exp.has_key('localname'):
2133                key = exp['localname']
2134                keytype = "localname"
2135            else:
2136                raise service_error(service_error.req, "Unknown lookup type")
2137        else:
2138            raise service_error(service_error.req, "No request?")
2139
2140        self.check_experiment_access(fid, key)
2141
2142        self.state_lock.acquire()
2143        if self.state.has_key(key):
2144            if self.state[key].has_key('vtopo'):
2145                rv = { 'experiment' : {keytype: key },\
2146                        'vtopo': self.state[key]['vtopo'],\
2147                    }
2148            else:
2149                state = self.state[key]['experimentStatus']
2150        self.state_lock.release()
2151
2152        if rv: return rv
2153        else: 
2154            if state:
2155                raise service_error(service_error.partial, 
2156                        "Not ready: %s" % state)
2157            else:
2158                raise service_error(service_error.req, "No such experiment")
2159
2160    def get_vis(self, req, fid):
2161        """
2162        Return the stored visualization for this experiment
2163        """
2164        rv = None
2165        state = None
2166
2167        req = req.get('VisRequestBody', None)
2168        if not req:
2169            raise service_error(service_error.req,
2170                    "Bad request format (no VisRequestBody)")
2171        exp = req.get('experiment', None)
2172        if exp:
2173            if exp.has_key('fedid'):
2174                key = exp['fedid']
2175                keytype = "fedid"
2176            elif exp.has_key('localname'):
2177                key = exp['localname']
2178                keytype = "localname"
2179            else:
2180                raise service_error(service_error.req, "Unknown lookup type")
2181        else:
2182            raise service_error(service_error.req, "No request?")
2183
2184        self.check_experiment_access(fid, key)
2185
2186        self.state_lock.acquire()
2187        if self.state.has_key(key):
2188            if self.state[key].has_key('vis'):
2189                rv =  { 'experiment' : {keytype: key },\
2190                        'vis': self.state[key]['vis'],\
2191                        }
2192            else:
2193                state = self.state[key]['experimentStatus']
2194        self.state_lock.release()
2195
2196        if rv: return rv
2197        else:
2198            if state:
2199                raise service_error(service_error.partial, 
2200                        "Not ready: %s" % state)
2201            else:
2202                raise service_error(service_error.req, "No such experiment")
2203
2204    def clean_info_response(self, rv):
2205        """
2206        Remove the information in the experiment's state object that is not in
2207        the info response.
2208        """
2209        # Remove the owner info (should always be there, but...)
2210        if rv.has_key('owner'): del rv['owner']
2211
2212        # Convert the log into the allocationLog parameter and remove the
2213        # log entry (with defensive programming)
2214        if rv.has_key('log'):
2215            rv['allocationLog'] = "".join(rv['log'])
2216            del rv['log']
2217        else:
2218            rv['allocationLog'] = ""
2219
2220        if rv['experimentStatus'] != 'active':
2221            if rv.has_key('federant'): del rv['federant']
2222        else:
2223            # remove the allocationID info from each federant
2224            for f in rv.get('federant', []):
2225                if f.has_key('allocID'): del f['allocID']
2226
2227        return rv
2228
2229    def get_info(self, req, fid):
2230        """
2231        Return all the stored info about this experiment
2232        """
2233        rv = None
2234
2235        req = req.get('InfoRequestBody', None)
2236        if not req:
2237            raise service_error(service_error.req,
2238                    "Bad request format (no InfoRequestBody)")
2239        exp = req.get('experiment', None)
2240        if exp:
2241            if exp.has_key('fedid'):
2242                key = exp['fedid']
2243                keytype = "fedid"
2244            elif exp.has_key('localname'):
2245                key = exp['localname']
2246                keytype = "localname"
2247            else:
2248                raise service_error(service_error.req, "Unknown lookup type")
2249        else:
2250            raise service_error(service_error.req, "No request?")
2251
2252        self.check_experiment_access(fid, key)
2253
2254        # The state may be massaged by the service function that called
2255        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2256        # state.
2257        self.state_lock.acquire()
2258        if self.state.has_key(key):
2259            rv = copy.deepcopy(self.state[key])
2260        self.state_lock.release()
2261
2262        if rv:
2263            return self.clean_info_response(rv)
2264        else:
2265            raise service_error(service_error.req, "No such experiment")
2266
2267    def get_multi_info(self, req, fid):
2268        """
2269        Return all the stored info that this fedid can access
2270        """
2271        rv = { 'info': [ ] }
2272
2273        self.state_lock.acquire()
2274        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2275            self.check_experiment_access(fid, key)
2276
2277            if self.state.has_key(key):
2278                e = copy.deepcopy(self.state[key])
2279                e = self.clean_info_response(e)
2280                rv['info'].append(e)
2281        self.state_lock.release()
2282        return rv
2283
2284
2285    def terminate_experiment(self, req, fid):
2286        """
2287        Swap this experiment out on the federants and delete the shared
2288        information
2289        """
2290        tbparams = { }
2291        req = req.get('TerminateRequestBody', None)
2292        if not req:
2293            raise service_error(service_error.req,
2294                    "Bad request format (no TerminateRequestBody)")
2295        force = req.get('force', False)
2296        exp = req.get('experiment', None)
2297        if exp:
2298            if exp.has_key('fedid'):
2299                key = exp['fedid']
2300                keytype = "fedid"
2301            elif exp.has_key('localname'):
2302                key = exp['localname']
2303                keytype = "localname"
2304            else:
2305                raise service_error(service_error.req, "Unknown lookup type")
2306        else:
2307            raise service_error(service_error.req, "No request?")
2308
2309        self.check_experiment_access(fid, key)
2310
2311        self.state_lock.acquire()
2312        fed_exp = self.state.get(key, None)
2313
2314        if fed_exp:
2315            # This branch of the conditional holds the lock to generate a
2316            # consistent temporary tbparams variable to deallocate experiments.
2317            # It releases the lock to do the deallocations and reacquires it to
2318            # remove the experiment state when the termination is complete.
2319
2320            # First make sure that the experiment creation is complete.
2321            status = fed_exp.get('experimentStatus', None)
2322            if status:
2323                if status == 'starting':
2324                    if not force:
2325                        self.state_lock.release()
2326                        raise service_error(service_error.partial, 
2327                                'Experiment still being created')
2328                    else:
2329                        self.log.warning('Experiment in starting state ' + \
2330                                'being terminated by admin.')
2331            else:
2332                # No status??? trouble
2333                self.state_lock.release()
2334                raise service_error(service_error.internal,
2335                        "Experiment has no status!?")
2336
2337            ids = []
2338            #  experimentID is a list of dicts that are self-describing
2339            #  identifiers.  This finds all the fedids and localnames - the
2340            #  keys of self.state - and puts them into ids.
2341            for id in fed_exp.get('experimentID', []):
2342                if id.has_key('fedid'): ids.append(id['fedid'])
2343                if id.has_key('localname'): ids.append(id['localname'])
2344
2345            # Construct enough of the tbparams to make the stop_segment calls
2346            # work
2347            for fed in fed_exp.get('federant', []):
2348                try:
2349                    for e in fed['name']:
2350                        eid = e.get('localname', None)
2351                        if eid: break
2352                    else:
2353                        continue
2354
2355                    p = fed['emulab']['project']
2356
2357                    project = p['name']['localname']
2358                    tb = p['testbed']['localname']
2359                    user = p['user'][0]['userID']['localname']
2360
2361                    domain = fed['emulab']['domain']
2362                    host  = fed['emulab']['ops']
2363                    aid = fed['allocID']
2364                except KeyError, e:
2365                    continue
2366                tbparams[tb] = {\
2367                        'user': user,\
2368                        'domain': domain,\
2369                        'project': project,\
2370                        'host': host,\
2371                        'eid': eid,\
2372                        'aid': aid,\
2373                    }
2374            self.state_lock.release()
2375
2376            # Stop everyone.
2377            thread_pool = self.thread_pool(self.nthreads)
2378            for tb in tbparams.keys():
2379                # Create and start a thread to stop the segment
2380                thread_pool.wait_for_slot()
2381                t  = self.pooled_thread(\
2382                        target=self.stop_segment(log=self.log,
2383                            keyfile=self.ssh_privkey_file, debug=self.debug), 
2384                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2385                        pdata=thread_pool, trace_file=self.trace_file)
2386                t.start()
2387            # Wait for completions
2388            thread_pool.wait_for_all_done()
2389
2390            # release the allocations (failed experiments have done this
2391            # already, and starting experiments may be in odd states, so we
2392            # ignore errors releasing those allocations
2393            try: 
2394                for tb in tbparams.keys():
2395                    self.release_access(tb, tbparams[tb]['aid'])
2396            except service_error, e:
2397                if status != 'failed' and not force:
2398                    raise e
2399
2400            # Remove the terminated experiment
2401            self.state_lock.acquire()
2402            for id in ids:
2403                if self.state.has_key(id): del self.state[id]
2404
2405            if self.state_filename: self.write_state()
2406            self.state_lock.release()
2407
2408            return { 'experiment': exp }
2409        else:
2410            # Don't forget to release the lock
2411            self.state_lock.release()
2412            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.