source: fedd/federation/experiment_control.py @ a74ea78

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since a74ea78 was a74ea78, checked in by Ted Faber <faber@…>, 15 years ago

Properly avoid terminating a starting experiment (this will have to get an
override) and be more conservative when starting ssh commands.

  • Property mode set to 100644
File size: 88.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = Lock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
70   
71    class thread_pool:
72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
148
149    class pooled_thread(Thread):
150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
188
189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
192
193    def __init__(self, config=None, auth=None):
194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
214        self.list_log = experiment_control_local.list_log
215
216        self.cert_file = config.get("experiment_control", "cert_file")
217        if self.cert_file:
218            self.cert_pwd = config.get("experiment_control", "cert_pwd")
219        else:
220            self.cert_file = config.get("globals", "cert_file")
221            self.cert_pwd = config.get("globals", "cert_pwd")
222
223        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
224                or config.get("globals", "trusted_certs")
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 2
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254        self.state = { }
255        self.state_lock = Lock()
256        self.tclsh = "/usr/local/bin/otclsh"
257        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
258                config.get("experiment_control", "tcl_splitter",
259                        "/usr/testbed/lib/ns2ir/parse.tcl")
260        mapdb_file = config.get("experiment_control", "mapdb")
261        self.trace_file = sys.stderr
262
263        self.def_expstart = \
264                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
265                "/tmp/federate";
266        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
267                "FEDDIR/hosts";
268        self.def_gwstart = \
269                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
270                "/tmp/bridge.log";
271        self.def_mgwstart = \
272                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
273                "/tmp/bridge.log";
274        self.def_gwimage = "FBSD61-TUNNEL2";
275        self.def_gwtype = "pc";
276        self.local_access = { }
277
278        if auth:
279            self.auth = auth
280        else:
281            self.log.error(\
282                    "[access]: No authorizer initialized, creating local one.")
283            auth = authorizer()
284
285
286        if self.ssh_pubkey_file:
287            try:
288                f = open(self.ssh_pubkey_file, 'r')
289                self.ssh_pubkey = f.read()
290                f.close()
291            except IOError:
292                raise service_error(service_error.internal,
293                        "Cannot read sshpubkey")
294        else:
295            raise service_error(service_error.internal, 
296                    "No SSH public key file?")
297
298        if not self.ssh_privkey_file:
299            raise service_error(service_error.internal, 
300                    "No SSH public key file?")
301
302
303        if mapdb_file:
304            self.read_mapdb(mapdb_file)
305        else:
306            self.log.warn("[experiment_control] No testbed map, using defaults")
307            self.tbmap = { 
308                    'deter':'https://users.isi.deterlab.net:23235',
309                    'emulab':'https://users.isi.deterlab.net:23236',
310                    'ucb':'https://users.isi.deterlab.net:23237',
311                    }
312
313        if accessdb_file:
314                self.read_accessdb(accessdb_file)
315        else:
316            raise service_error(service_error.internal,
317                    "No accessdb specified in config")
318
319        # Grab saved state.  OK to do this w/o locking because it's read only
320        # and only one thread should be in existence that can see self.state at
321        # this point.
322        if self.state_filename:
323            self.read_state()
324
325        # Dispatch tables
326        self.soap_services = {\
327                'Create': soap_handler('Create', self.create_experiment),
328                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
329                'Vis': soap_handler('Vis', self.get_vis),
330                'Info': soap_handler('Info', self.get_info),
331                'Terminate': soap_handler('Terminate', 
332                    self.terminate_experiment),
333        }
334
335        self.xmlrpc_services = {\
336                'Create': xmlrpc_handler('Create', self.create_experiment),
337                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
338                'Vis': xmlrpc_handler('Vis', self.get_vis),
339                'Info': xmlrpc_handler('Info', self.get_info),
340                'Terminate': xmlrpc_handler('Terminate',
341                    self.terminate_experiment),
342        }
343
344    def copy_file(self, src, dest, size=1024):
345        """
346        Exceedingly simple file copy.
347        """
348        s = open(src,'r')
349        d = open(dest, 'w')
350
351        buf = "x"
352        while buf != "":
353            buf = s.read(size)
354            d.write(buf)
355        s.close()
356        d.close()
357
358    # Call while holding self.state_lock
359    def write_state(self):
360        """
361        Write a new copy of experiment state after copying the existing state
362        to a backup.
363
364        State format is a simple pickling of the state dictionary.
365        """
366        if os.access(self.state_filename, os.W_OK):
367            self.copy_file(self.state_filename, \
368                    "%s.bak" % self.state_filename)
369        try:
370            f = open(self.state_filename, 'w')
371            pickle.dump(self.state, f)
372        except IOError, e:
373            self.log.error("Can't write file %s: %s" % \
374                    (self.state_filename, e))
375        except pickle.PicklingError, e:
376            self.log.error("Pickling problem: %s" % e)
377        except TypeError, e:
378            self.log.error("Pickling problem (TypeError): %s" % e)
379
380    # Call while holding self.state_lock
381    def read_state(self):
382        """
383        Read a new copy of experiment state.  Old state is overwritten.
384
385        State format is a simple pickling of the state dictionary.
386        """
387        try:
388            f = open(self.state_filename, "r")
389            self.state = pickle.load(f)
390            self.log.debug("[read_state]: Read state from %s" % \
391                    self.state_filename)
392        except IOError, e:
393            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
394                    % (self.state_filename, e))
395        except pickle.UnpicklingError, e:
396            self.log.warning(("[read_state]: No saved state: " + \
397                    "Unpickling failed: %s") % e)
398       
399        for k in self.state.keys():
400            try:
401                # This list should only have one element in it, but phrasing it
402                # as a for loop doesn't cost much, really.  We have to find the
403                # fedid elements anyway.
404                for eid in [ f['fedid'] \
405                        for f in self.state[k]['experimentID']\
406                            if f.has_key('fedid') ]:
407                    self.auth.set_attribute(self.state[k]['owner'], eid)
408            except KeyError, e:
409                self.log.warning("[read_state]: State ownership or identity " +\
410                        "misformatted in %s: %s" % (self.state_filename, e))
411
412
413    def read_accessdb(self, accessdb_file):
414        """
415        Read the mapping from fedids that can create experiments to their name
416        in the 3-level access namespace.  All will be asserted from this
417        testbed and can include the local username and porject that will be
418        asserted on their behalf by this fedd.  Each fedid is also added to the
419        authorization system with the "create" attribute.
420        """
421        self.accessdb = {}
422        # These are the regexps for parsing the db
423        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
424        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
425                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
426        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
427                "\s*->\s*(" + name_expr + ")\s*$")
428        lineno = 0
429
430        # Parse the mappings and store in self.authdb, a dict of
431        # fedid -> (proj, user)
432        try:
433            f = open(accessdb_file, "r")
434            for line in f:
435                lineno += 1
436                line = line.strip()
437                if len(line) == 0 or line.startswith('#'):
438                    continue
439                m = project_line.match(line)
440                if m:
441                    fid = fedid(hexstr=m.group(1))
442                    project, user = m.group(2,3)
443                    if not self.accessdb.has_key(fid):
444                        self.accessdb[fid] = []
445                    self.accessdb[fid].append((project, user))
446                    continue
447
448                m = user_line.match(line)
449                if m:
450                    fid = fedid(hexstr=m.group(1))
451                    project = None
452                    user = m.group(2)
453                    if not self.accessdb.has_key(fid):
454                        self.accessdb[fid] = []
455                    self.accessdb[fid].append((project, user))
456                    continue
457                self.log.warn("[experiment_control] Error parsing access " +\
458                        "db %s at line %d" %  (accessdb_file, lineno))
459        except IOError:
460            raise service_error(service_error.internal,
461                    "Error opening/reading %s as experiment " +\
462                            "control accessdb" %  accessdb_file)
463        f.close()
464
465        # Initialize the authorization attributes
466        for fid in self.accessdb.keys():
467            self.auth.set_attribute(fid, 'create')
468
469    def read_mapdb(self, file):
470        """
471        Read a simple colon separated list of mappings for the
472        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
473        """
474
475        self.tbmap = { }
476        lineno =0
477        try:
478            f = open(file, "r")
479            for line in f:
480                lineno += 1
481                line = line.strip()
482                if line.startswith('#') or len(line) == 0:
483                    continue
484                try:
485                    label, url = line.split(':', 1)
486                    self.tbmap[label] = url
487                except ValueError, e:
488                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
489                            "map db: %s %s" % (lineno, line, e))
490        except IOError, e:
491            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
492                    "open %s: %s" % (file, e))
493        f.close()
494
495    class emulab_segment:
496        def __init__(self, log=None, keyfile=None, debug=False):
497            self.log = log or logging.getLogger(\
498                    'fedd.experiment_control.emulab_segment')
499            self.ssh_privkey_file = keyfile
500            self.debug = debug
501            self.ssh_exec="/usr/bin/ssh"
502            self.scp_exec = "/usr/bin/scp"
503            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
504
505        def scp_file(self, file, user, host, dest=""):
506            """
507            scp a file to the remote host.  If debug is set the action is only
508            logged.
509            """
510
511            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
512                    '-o', 'StrictHostKeyChecking yes', '-i', 
513                    self.ssh_privkey_file, file, 
514                    "%s@%s:%s" % (user, host, dest)]
515            rv = 0
516
517            try:
518                dnull = open("/dev/null", "w")
519            except IOError:
520                self.log.debug("[ssh_file]: failed to open " + \
521                        "/dev/null for redirect")
522                dnull = Null
523
524            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
525            if not self.debug:
526                rv = call(scp_cmd, stdout=dnull, stderr=dnull)
527
528            return rv == 0
529
530        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
531            """
532            Run a remote command on host as user.  If debug is set, the action
533            is only logged.  Commands are run without stdin, to avoid stray
534            SIGTTINs.
535            """
536            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
537                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
538                    (self.ssh_exec, self.ssh_privkey_file, 
539                            user, host, cmd)
540
541            try:
542                dnull = open("/dev/null", "r")
543            except IOError:
544                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
545                        "for redirect")
546                dnull = Null
547
548            self.log.debug("[ssh_cmd]: %s" % sh_str)
549            if not self.debug:
550                if dnull:
551                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
552                else:
553                    sub = Popen(sh_str, shell=True)
554                if timeout:
555                    i = 0
556                    rv = sub.poll()
557                    while i < timeout:
558                        if rv is not None: break
559                        else:
560                            time.sleep(1)
561                            rv = sub.poll()
562                            i += 1
563                    else:
564                        self.log.debug("Process exceeded runtime: %s" % sh_str)
565                        os.kill(sub.pid, signal.SIGKILL)
566                        raise self.ssh_cmd_timeout();
567                    return rv == 0
568                else:
569                    return sub.wait() == 0
570            else:
571                return True
572
573    class start_segment(emulab_segment):
574        def __init__(self, log=None, keyfile=None, debug=False):
575            experiment_control_local.emulab_segment.__init__(self,
576                    log=log, keyfile=keyfile, debug=debug)
577
578        def create_config_tree(self, src_dir, dest_dir, script):
579            """
580            Append commands to script that will create the directory hierarchy
581            on the remote federant.
582            """
583
584            if os.path.isdir(src_dir):
585                print >>script, "mkdir -p %s" % dest_dir
586                print >>script, "chmod 770 %s" % dest_dir
587
588                for f in os.listdir(src_dir):
589                    if os.path.isdir(f):
590                        self.create_config_tree("%s/%s" % (src_dir, f), 
591                                "%s/%s" % (dest_dir, f), script)
592            else:
593                self.log.debug("[create_config_tree]: Not a directory: %s" \
594                        % src_dir)
595
596        def ship_configs(self, host, user, src_dir, dest_dir):
597            """
598            Copy federant-specific configuration files to the federant.
599            """
600            for f in os.listdir(src_dir):
601                if os.path.isdir(f):
602                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
603                            "%s/%s" % (dest_dir, f)):
604                        return False
605                else:
606                    if not self.scp_file("%s/%s" % (src_dir, f), 
607                            user, host, dest_dir):
608                        return False
609            return True
610
611        def get_state(self, user, host, tb, pid, eid):
612            # command to test experiment state
613            expinfo_exec = "/usr/testbed/bin/expinfo" 
614            # Regular expressions to parse the expinfo response
615            state_re = re.compile("State:\s+(\w+)")
616            no_exp_re = re.compile("^No\s+such\s+experiment")
617            swapping_re = re.compile("^No\s+information\s+available.")
618            state = None    # Experiment state parsed from expinfo
619            # The expinfo ssh command.  Note the identity restriction to use
620            # only the identity provided in the pubkey given.
621            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
622                    'StrictHostKeyChecking yes', '-i', 
623                    self.ssh_privkey_file, "%s@%s" % (user, host), 
624                    expinfo_exec, pid, eid]
625
626            dev_null = None
627            try:
628                dev_null = open("/dev/null", "a")
629            except IOError, e:
630                self.log.error("[get_state]: can't open /dev/null: %s" %e)
631
632            if self.debug:
633                state = 'swapped'
634                rv = 0
635            else:
636                status = Popen(cmd, stdout=PIPE, stderr=dev_null)
637                for line in status.stdout:
638                    m = state_re.match(line)
639                    if m: state = m.group(1)
640                    else:
641                        for reg, st in ((no_exp_re, "none"),
642                                (swapping_re, "swapping")):
643                            m = reg.match(line)
644                            if m: state = st
645                rv = status.wait()
646
647            # If the experiment is not present the subcommand returns a
648            # non-zero return value.  If we successfully parsed a "none"
649            # outcome, ignore the return code.
650            if rv != 0 and state != 'none':
651                raise service_error(service_error.internal,
652                        "Cannot get status of segment %s:%s/%s" % \
653                                (tb, pid, eid))
654            elif state not in ('active', 'swapped', 'swapping', 'none'):
655                raise service_error(service_error.internal,
656                        "Cannot get status of segment %s:%s/%s" % \
657                                (tb, pid, eid))
658            else: return state
659
660
661        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
662            """
663            Start a sub-experiment on a federant.
664
665            Get the current state, modify or create as appropriate, ship data
666            and configs and start the experiment.  There are small ordering
667            differences based on the initial state of the sub-experiment.
668            """
669            # ops node in the federant
670            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
671            user = tbparams[tb]['user']     # federant user
672            pid = tbparams[tb]['project']   # federant project
673            # XXX
674            base_confs = ( "hosts",)
675            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
676            # Configuration directories on the remote machine
677            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
678            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
679            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
680
681            state = self.get_state(user, host, tb, pid, eid)
682
683            self.log.debug("[start_segment]: %s: %s" % (tb, state))
684            self.log.info("[start_segment]:transferring experiment to %s" % tb)
685
686            if not self.scp_file("%s/%s/%s" % \
687                    (tmpdir, tb, tclfile), user, host):
688                return False
689           
690            if state == 'none':
691                # Create a null copy of the experiment so that we capture any
692                # logs there if the modify fails.  Emulab software discards the
693                # logs from a failed startexp
694                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
695                    return False
696                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
697                timedout = False
698                try:
699                    if not self.ssh_cmd(user, host,
700                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
701                            "-e %s null.tcl") % (pid, eid), "startexp",
702                            timeout=60 * 10):
703                        return False
704                except self.ssh_cmd_timeout:
705                    timedout = True
706
707                if timedout:
708                    state = self.get_state(user, host, self.ssh_privkey_file, 
709                            tb, eid, pid)
710                    if state != "swapped":
711                        return False
712
713           
714            # Open up a temporary file to contain a script for setting up the
715            # filespace for the new experiment.
716            self.log.info("[start_segment]: creating script file")
717            try:
718                sf, scriptname = tempfile.mkstemp()
719                scriptfile = os.fdopen(sf, 'w')
720            except IOError:
721                return False
722
723            scriptbase = os.path.basename(scriptname)
724
725            # Script the filesystem changes
726            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
727            # Clear and create the tarfiles and rpm directories
728            for d in (tarfiles_dir, rpms_dir):
729                print >>scriptfile, "/bin/rm -rf %s/*" % d
730                print >>scriptfile, "mkdir -p %s" % d
731            print >>scriptfile, 'mkdir -p %s' % proj_dir
732            self.create_config_tree("%s/%s" % (tmpdir, tb),
733                    proj_dir, scriptfile)
734            if os.path.isdir("%s/tarfiles" % tmpdir):
735                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
736                        scriptfile)
737            if os.path.isdir("%s/rpms" % tmpdir):
738                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
739                        scriptfile)
740            print >>scriptfile, "rm -f %s" % scriptbase
741            scriptfile.close()
742
743            # Move the script to the remote machine
744            # XXX: could collide tempfile names on the remote host
745            if self.scp_file(scriptname, user, host, scriptbase):
746                os.remove(scriptname)
747            else:
748                return False
749
750            # Execute the script (and the script's last line deletes it)
751            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
752                return False
753
754            for f in base_confs:
755                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
756                        "%s/%s" % (proj_dir, f)):
757                    return False
758            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
759                    proj_dir):
760                return False
761            if os.path.isdir("%s/tarfiles" % tmpdir):
762                if not self.ship_configs(host, user,
763                        "%s/tarfiles" % tmpdir, tarfiles_dir):
764                    return False
765            if os.path.isdir("%s/rpms" % tmpdir):
766                if not self.ship_configs(host, user,
767                        "%s/rpms" % tmpdir, tarfiles_dir):
768                    return False
769            # Stage the new configuration (active experiments will stay swapped
770            # in now)
771            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
772            try:
773                if not self.ssh_cmd(user, host,
774                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
775                                (pid, eid, tclfile),
776                        "modexp", timeout= 60 * 10):
777                    return False
778            except self.ssh_cmd_timeout:
779                print "modexp timeout"
780                # There's really no way to see if this succeeded or failed, so
781                # if it hangs, assume the worst.
782                return False
783            # Active experiments are still swapped, this swaps the others in.
784            if state != 'active':
785                self.log.info("[start_segment]: Swapping %s in on %s" % \
786                        (eid, tb))
787                timedout = False
788                try:
789                    if not self.ssh_cmd(user, host,
790                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
791                            "swapexp", timeout=10*60):
792                        return False
793                except self.ssh_cmd_timeout:
794                    timedout = True
795               
796                # If the command was terminated, but completed successfully,
797                # report success.
798                if timedout:
799                    state = self.get_state(user, host, self.ssh_privkey_file,
800                            tb, eid, pid)
801                    self.log.debug("[start_segment]: swapin timed out (state)")
802                    return state == 'active'
803            # Everything has gone OK.
804            return True
805
806    class stop_segment(emulab_segment):
807        def __init__(self, log=None, keyfile=None, debug=False):
808            experiment_control_local.emulab_segment.__init__(self,
809                    log=log, keyfile=keyfile, debug=debug)
810
811        def __call__(self, tb, eid, tbparams):
812            """
813            Stop a sub experiment by calling swapexp on the federant
814            """
815            user = tbparams[tb]['user']
816            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
817            pid = tbparams[tb]['project']
818
819            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
820            return self.ssh_cmd(user, host,
821                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
822
823       
824    def generate_ssh_keys(self, dest, type="rsa" ):
825        """
826        Generate a set of keys for the gateways to use to talk.
827
828        Keys are of type type and are stored in the required dest file.
829        """
830        valid_types = ("rsa", "dsa")
831        t = type.lower();
832        if t not in valid_types: raise ValueError
833        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
834
835        try:
836            trace = open("/dev/null", "w")
837        except IOError:
838            raise service_error(service_error.internal,
839                    "Cannot open /dev/null??");
840
841        # May raise CalledProcessError
842        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
843        rv = call(cmd, stdout=trace, stderr=trace)
844        if rv != 0:
845            raise service_error(service_error.internal, 
846                    "Cannot generate nonce ssh keys.  %s return code %d" \
847                            % (self.ssh_keygen, rv))
848
849    def gentopo(self, str):
850        """
851        Generate the topology dtat structure from the splitter's XML
852        representation of it.
853
854        The topology XML looks like:
855            <experiment>
856                <nodes>
857                    <node><vname></vname><ips>ip1:ip2</ips></node>
858                </nodes>
859                <lans>
860                    <lan>
861                        <vname></vname><vnode></vnode><ip></ip>
862                        <bandwidth></bandwidth><member>node:port</member>
863                    </lan>
864                </lans>
865        """
866        class topo_parse:
867            """
868            Parse the topology XML and create the dats structure.
869            """
870            def __init__(self):
871                # Typing of the subelements for data conversion
872                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
873                self.int_subelements = ( 'bandwidth',)
874                self.float_subelements = ( 'delay',)
875                # The final data structure
876                self.nodes = [ ]
877                self.lans =  [ ]
878                self.topo = { \
879                        'node': self.nodes,\
880                        'lan' : self.lans,\
881                    }
882                self.element = { }  # Current element being created
883                self.chars = ""     # Last text seen
884
885            def end_element(self, name):
886                # After each sub element the contents is added to the current
887                # element or to the appropriate list.
888                if name == 'node':
889                    self.nodes.append(self.element)
890                    self.element = { }
891                elif name == 'lan':
892                    self.lans.append(self.element)
893                    self.element = { }
894                elif name in self.str_subelements:
895                    self.element[name] = self.chars
896                    self.chars = ""
897                elif name in self.int_subelements:
898                    self.element[name] = int(self.chars)
899                    self.chars = ""
900                elif name in self.float_subelements:
901                    self.element[name] = float(self.chars)
902                    self.chars = ""
903
904            def found_chars(self, data):
905                self.chars += data.rstrip()
906
907
908        tp = topo_parse();
909        parser = xml.parsers.expat.ParserCreate()
910        parser.EndElementHandler = tp.end_element
911        parser.CharacterDataHandler = tp.found_chars
912
913        parser.Parse(str)
914
915        return tp.topo
916       
917
918    def genviz(self, topo):
919        """
920        Generate the visualization the virtual topology
921        """
922
923        neato = "/usr/local/bin/neato"
924        # These are used to parse neato output and to create the visualization
925        # file.
926        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
927        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
928                "%s</type></node>"
929
930        try:
931            # Node names
932            nodes = [ n['vname'] for n in topo['node'] ]
933            topo_lans = topo['lan']
934        except KeyError:
935            raise service_error(service_error.internal, "Bad topology")
936
937        lans = { }
938        links = { }
939
940        # Walk through the virtual topology, organizing the connections into
941        # 2-node connections (links) and more-than-2-node connections (lans).
942        # When a lan is created, it's added to the list of nodes (there's a
943        # node in the visualization for the lan).
944        for l in topo_lans:
945            if links.has_key(l['vname']):
946                if len(links[l['vname']]) < 2:
947                    links[l['vname']].append(l['vnode'])
948                else:
949                    nodes.append(l['vname'])
950                    lans[l['vname']] = links[l['vname']]
951                    del links[l['vname']]
952                    lans[l['vname']].append(l['vnode'])
953            elif lans.has_key(l['vname']):
954                lans[l['vname']].append(l['vnode'])
955            else:
956                links[l['vname']] = [ l['vnode'] ]
957
958
959        # Open up a temporary file for dot to turn into a visualization
960        try:
961            df, dotname = tempfile.mkstemp()
962            dotfile = os.fdopen(df, 'w')
963        except IOError:
964            raise service_error(service_error.internal,
965                    "Failed to open file in genviz")
966
967        # Generate a dot/neato input file from the links, nodes and lans
968        try:
969            print >>dotfile, "graph G {"
970            for n in nodes:
971                print >>dotfile, '\t"%s"' % n
972            for l in links.keys():
973                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
974            for l in lans.keys():
975                for n in lans[l]:
976                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
977            print >>dotfile, "}"
978            dotfile.close()
979        except TypeError:
980            raise service_error(service_error.internal,
981                    "Single endpoint link in vtopo")
982        except IOError:
983            raise service_error(service_error.internal, "Cannot write dot file")
984
985        # Use dot to create a visualization
986        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
987                '-Gpack=true', dotname], stdout=PIPE)
988
989        # Translate dot to vis format
990        vis_nodes = [ ]
991        vis = { 'node': vis_nodes }
992        for line in dot.stdout:
993            m = vis_re.match(line)
994            if m:
995                vn = m.group(1)
996                vis_node = {'name': vn, \
997                        'x': float(m.group(2)),\
998                        'y' : float(m.group(3)),\
999                    }
1000                if vn in links.keys() or vn in lans.keys():
1001                    vis_node['type'] = 'lan'
1002                else:
1003                    vis_node['type'] = 'node'
1004                vis_nodes.append(vis_node)
1005        rv = dot.wait()
1006
1007        os.remove(dotname)
1008        if rv == 0 : return vis
1009        else: return None
1010
1011    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1012            access_user):
1013        """
1014        Get access to testbed through fedd and set the parameters for that tb
1015        """
1016        uri = self.tbmap.get(tb, None)
1017        if not uri:
1018            raise service_error(serice_error.server_config, 
1019                    "Unknown testbed: %s" % tb)
1020
1021        # currently this lumps all users into one service access group
1022        service_keys = [ a for u in user \
1023                for a in u.get('access', []) \
1024                    if a.has_key('sshPubkey')]
1025
1026        if len(service_keys) == 0:
1027            raise service_error(service_error.req, 
1028                    "Must have at least one SSH pubkey for services")
1029
1030
1031        for p, u in access_user:
1032            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1033                    "to %s") %  ((p or "None"), u, uri))
1034
1035            if p:
1036                # Request with user and project specified
1037                req = {\
1038                        'destinationTestbed' : { 'uri' : uri },
1039                        'project': { 
1040                            'name': {'localname': p},
1041                            'user': [ {'userID': { 'localname': u } } ],
1042                            },
1043                        'user':  user,
1044                        'allocID' : { 'localname': 'test' },
1045                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1046                        'serviceAccess' : service_keys
1047                    }
1048            else:
1049                # Request with only user specified
1050                req = {\
1051                        'destinationTestbed' : { 'uri' : uri },
1052                        'user':  [ {'userID': { 'localname': u } } ],
1053                        'allocID' : { 'localname': 'test' },
1054                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1055                        'serviceAccess' : service_keys
1056                    }
1057
1058            if tb == master:
1059                # NB, the export_project parameter is a dict that includes
1060                # the type
1061                req['exportProject'] = export_project
1062
1063            # node resources if any
1064            if nodes != None and len(nodes) > 0:
1065                rnodes = [ ]
1066                for n in nodes:
1067                    rn = { }
1068                    image, hw, count = n.split(":")
1069                    if image: rn['image'] = [ image ]
1070                    if hw: rn['hardware'] = [ hw ]
1071                    if count and int(count) >0 : rn['count'] = int(count)
1072                    rnodes.append(rn)
1073                req['resources']= { }
1074                req['resources']['node'] = rnodes
1075
1076            try:
1077                if self.local_access.has_key(uri):
1078                    # Local access call
1079                    req = { 'RequestAccessRequestBody' : req }
1080                    r = self.local_access[uri].RequestAccess(req, 
1081                            fedid(file=self.cert_file))
1082                    r = { 'RequestAccessResponseBody' : r }
1083                else:
1084                    r = self.call_RequestAccess(uri, req, 
1085                            self.cert_file, self.cert_pwd, self.trusted_certs)
1086            except service_error, e:
1087                if e.code == service_error.access:
1088                    self.log.debug("[get_access] Access denied")
1089                    r = None
1090                    continue
1091                else:
1092                    raise e
1093
1094            if r.has_key('RequestAccessResponseBody'):
1095                # Through to here we have a valid response, not a fault.
1096                # Access denied is a fault, so something better or worse than
1097                # access denied has happened.
1098                r = r['RequestAccessResponseBody']
1099                self.log.debug("[get_access] Access granted")
1100                break
1101            else:
1102                raise service_error(service_error.protocol,
1103                        "Bad proxy response")
1104       
1105        if not r:
1106            raise service_error(service_error.access, 
1107                    "Access denied by %s (%s)" % (tb, uri))
1108
1109        e = r['emulab']
1110        p = e['project']
1111        tbparam[tb] = { 
1112                "boss": e['boss'],
1113                "host": e['ops'],
1114                "domain": e['domain'],
1115                "fs": e['fileServer'],
1116                "eventserver": e['eventServer'],
1117                "project": unpack_id(p['name']),
1118                "emulab" : e,
1119                "allocID" : r['allocID'],
1120                }
1121        # Make the testbed name be the label the user applied
1122        p['testbed'] = {'localname': tb }
1123
1124        for u in p['user']:
1125            role = u.get('role', None)
1126            if role == 'experimentCreation':
1127                tbparam[tb]['user'] = unpack_id(u['userID'])
1128                break
1129        else:
1130            raise service_error(service_error.internal, 
1131                    "No createExperimentUser from %s" %tb)
1132
1133        # Add attributes to barameter space.  We don't allow attributes to
1134        # overlay any parameters already installed.
1135        for a in e['fedAttr']:
1136            try:
1137                if a['attribute'] and isinstance(a['attribute'], basestring)\
1138                        and not tbparam[tb].has_key(a['attribute'].lower()):
1139                    tbparam[tb][a['attribute'].lower()] = a['value']
1140            except KeyError:
1141                self.log.error("Bad attribute in response: %s" % a)
1142       
1143    def release_access(self, tb, aid):
1144        """
1145        Release access to testbed through fedd
1146        """
1147
1148        uri = self.tbmap.get(tb, None)
1149        if not uri:
1150            raise service_error(serice_error.server_config, 
1151                    "Unknown testbed: %s" % tb)
1152
1153        if self.local_access.has_key(uri):
1154            resp = self.local_access[uri].ReleaseAccess(\
1155                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1156                    fedid(file=self.cert_file))
1157            resp = { 'ReleaseAccessResponseBody': resp } 
1158        else:
1159            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1160                    self.cert_file, self.cert_pwd, self.trusted_certs)
1161
1162        # better error coding
1163
1164    def remote_splitter(self, uri, desc, master):
1165
1166        req = {
1167                'description' : { 'ns2description': desc },
1168                'master': master,
1169                'include_fedkit': bool(self.fedkit),
1170                'include_gatewaykit': bool(self.gatewaykit)
1171            }
1172
1173        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1174                self.trusted_certs)
1175
1176        if r.has_key('Ns2SplitResponseBody'):
1177            r = r['Ns2SplitResponseBody']
1178            if r.has_key('output'):
1179                return r['output'].splitlines()
1180            else:
1181                raise service_error(service_error.protocol, 
1182                        "Bad splitter response (no output)")
1183        else:
1184            raise service_error(service_error.protocol, "Bad splitter response")
1185       
1186    class current_testbed:
1187        """
1188        Object for collecting the current testbed description.  The testbed
1189        description is saved to a file with the local testbed variables
1190        subsittuted line by line.
1191        """
1192        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1193            def tar_list_to_string(tl):
1194                if tl is None: return None
1195
1196                rv = ""
1197                for t in tl:
1198                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1199                            (t[0], os.path.basename(t[1]))
1200                return rv
1201
1202
1203            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1204            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1205            self.current_testbed = None
1206            self.testbed_file = None
1207
1208            self.def_expstart = \
1209                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1210            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1211            self.def_gwstart = \
1212                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1213            self.def_mgwstart = \
1214                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1215            self.def_gwimage = "FBSD61-TUNNEL2";
1216            self.def_gwtype = "pc";
1217            self.def_mgwcmd = '# '
1218            self.def_mgwcmdparams = ''
1219            self.def_gwcmd = '# '
1220            self.def_gwcmdparams = ''
1221
1222            self.eid = eid
1223            self.tmpdir = tmpdir
1224            # Convert fedkit and gateway kit (which are lists of tuples) into a
1225            # substituition string.
1226            self.fedkit = tar_list_to_string(fedkit)
1227            self.gatewaykit = tar_list_to_string(gatewaykit)
1228
1229        def __call__(self, line, master, allocated, tbparams):
1230            # Capture testbed topology descriptions
1231            if self.current_testbed == None:
1232                m = self.begin_testbed.match(line)
1233                if m != None:
1234                    self.current_testbed = m.group(1)
1235                    if self.current_testbed == None:
1236                        raise service_error(service_error.req,
1237                                "Bad request format (unnamed testbed)")
1238                    allocated[self.current_testbed] = \
1239                            allocated.get(self.current_testbed,0) + 1
1240                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1241                    if not os.path.exists(tb_dir):
1242                        try:
1243                            os.mkdir(tb_dir)
1244                        except IOError:
1245                            raise service_error(service_error.internal,
1246                                    "Cannot create %s" % tb_dir)
1247                    try:
1248                        self.testbed_file = open("%s/%s.%s.tcl" %
1249                                (tb_dir, self.eid, self.current_testbed), 'w')
1250                    except IOError:
1251                        self.testbed_file = None
1252                    return True
1253                else: return False
1254            else:
1255                m = self.end_testbed.match(line)
1256                if m != None:
1257                    if m.group(1) != self.current_testbed:
1258                        raise service_error(service_error.internal, 
1259                                "Mismatched testbed markers!?")
1260                    if self.testbed_file != None: 
1261                        self.testbed_file.close()
1262                        self.testbed_file = None
1263                    self.current_testbed = None
1264                elif self.testbed_file:
1265                    # Substitute variables and put the line into the local
1266                    # testbed file.
1267                    gwtype = tbparams[self.current_testbed].get(\
1268                            'connectortype', self.def_gwtype)
1269                    gwimage = tbparams[self.current_testbed].get(\
1270                            'connectorimage', self.def_gwimage)
1271                    mgwstart = tbparams[self.current_testbed].get(\
1272                            'masterconnectorstartcmd', self.def_mgwstart)
1273                    mexpstart = tbparams[self.current_testbed].get(\
1274                            'masternodestartcmd', self.def_mexpstart)
1275                    gwstart = tbparams[self.current_testbed].get(\
1276                            'slaveconnectorstartcmd', self.def_gwstart)
1277                    expstart = tbparams[self.current_testbed].get(\
1278                            'slavenodestartcmd', self.def_expstart)
1279                    project = tbparams[self.current_testbed].get('project')
1280                    gwcmd = tbparams[self.current_testbed].get(\
1281                            'slaveconnectorcmd', self.def_gwcmd)
1282                    gwcmdparams = tbparams[self.current_testbed].get(\
1283                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1284                    mgwcmd = tbparams[self.current_testbed].get(\
1285                            'masterconnectorcmd', self.def_gwcmd)
1286                    mgwcmdparams = tbparams[self.current_testbed].get(\
1287                            'masterconnectorcmdparams', self.def_gwcmdparams)
1288                    line = re.sub("GWTYPE", gwtype, line)
1289                    line = re.sub("GWIMAGE", gwimage, line)
1290                    if self.current_testbed == master:
1291                        line = re.sub("GWSTART", mgwstart, line)
1292                        line = re.sub("EXPSTART", mexpstart, line)
1293                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1294                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1295                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1296                    else:
1297                        line = re.sub("GWSTART", gwstart, line)
1298                        line = re.sub("EXPSTART", expstart, line)
1299                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1300                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1301                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1302                    #These expansions contain EID and PROJDIR.  NB these are
1303                    # local fedkit and gatewaykit, which are strings.
1304                    if self.fedkit:
1305                        line = re.sub("FEDKIT", self.fedkit, line)
1306                    if self.gatewaykit:
1307                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1308                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1309                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1310                    line = re.sub("EID", self.eid, line)
1311                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1312                            (project, self.eid), line)
1313                    print >>self.testbed_file, line
1314                return True
1315
1316    class allbeds:
1317        """
1318        Process the Allbeds section.  Get access to each federant and save the
1319        parameters in tbparams
1320        """
1321        def __init__(self, get_access):
1322            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1323            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1324            self.in_allbeds = False
1325            self.get_access = get_access
1326
1327        def __call__(self, line, user, tbparams, master, export_project,
1328                access_user):
1329            # Testbed access parameters
1330            if not self.in_allbeds:
1331                if self.begin_allbeds.match(line):
1332                    self.in_allbeds = True
1333                    return True
1334                else:
1335                    return False
1336            else:
1337                if self.end_allbeds.match(line):
1338                    self.in_allbeds = False
1339                else:
1340                    nodes = line.split('|')
1341                    tb = nodes.pop(0)
1342                    self.get_access(tb, nodes, user, tbparams, master,
1343                            export_project, access_user)
1344                return True
1345
1346    class gateways:
1347        def __init__(self, eid, master, tmpdir, gw_pubkey,
1348                gw_secretkey, copy_file, fedkit):
1349            self.begin_gateways = \
1350                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1351            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1352            self.current_gateways = None
1353            self.control_gateway = None
1354            self.active_end = { }
1355
1356            self.eid = eid
1357            self.master = master
1358            self.tmpdir = tmpdir
1359            self.gw_pubkey_base = gw_pubkey
1360            self.gw_secretkey_base = gw_secretkey
1361
1362            self.copy_file = copy_file
1363            self.fedkit = fedkit
1364
1365
1366        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1367                active_end, tbparams, dtb, myname, desthost, type):
1368            """
1369            Produce a gateway configuration file from a gateways line.
1370            """
1371
1372            sproject = tbparams[gw].get('project', 'project')
1373            dproject = tbparams[dtb].get('project', 'project')
1374            sdomain = ".%s.%s%s" % (eid, sproject,
1375                    tbparams[gw].get('domain', ".example.com"))
1376            ddomain = ".%s.%s%s" % (eid, dproject,
1377                    tbparams[dtb].get('domain', ".example.com"))
1378            boss = tbparams[master].get('boss', "boss")
1379            fs = tbparams[master].get('fs', "fs")
1380            event_server = "%s%s" % \
1381                    (tbparams[gw].get('eventserver', "event_server"),
1382                            tbparams[gw].get('domain', "example.com"))
1383            remote_event_server = "%s%s" % \
1384                    (tbparams[dtb].get('eventserver', "event_server"),
1385                            tbparams[dtb].get('domain', "example.com"))
1386            seer_control = "%s%s" % \
1387                    (tbparams[gw].get('control', "control"), sdomain)
1388            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1389
1390            if self.fedkit:
1391                remote_script_dir = "/usr/local/federation/bin"
1392                local_script_dir = "/usr/local/federation/bin"
1393            else:
1394                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1395                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1396
1397            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1398            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1399            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1400
1401            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1402            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1403
1404            # translate to lower case so the `hostname` hack for specifying
1405            # configuration files works.
1406            conf_file = conf_file.lower();
1407            remote_conf_file = remote_conf_file.lower();
1408
1409            if dtb == master:
1410                active = "false"
1411            elif gw == master:
1412                active = "true"
1413            elif active_end.has_key('%s-%s' % (dtb, gw)):
1414                active = "false"
1415            else:
1416                active_end['%s-%s' % (gw, dtb)] = 1
1417                active = "true"
1418
1419            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1420            print >>gwconfig, "Active: %s" % active
1421            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1422            if tunnel_iface:
1423                print >>gwconfig, "Interface: %s" % tunnel_iface
1424            print >>gwconfig, "BossName: %s" % boss
1425            print >>gwconfig, "FsName: %s" % fs
1426            print >>gwconfig, "EventServerName: %s" % event_server
1427            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1428            print >>gwconfig, "SeerControl: %s" % seer_control
1429            print >>gwconfig, "Type: %s" % type
1430            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1431            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1432                    local_script_dir
1433            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1434            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1435            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1436                    (remote_conf_dir, remote_conf_file)
1437            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1438            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1439            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1440            gwconfig.close()
1441
1442            return active == "true"
1443
1444        def __call__(self, line, allocated, tbparams):
1445            # Process gateways
1446            if not self.current_gateways:
1447                m = self.begin_gateways.match(line)
1448                if m:
1449                    self.current_gateways = m.group(1)
1450                    if allocated.has_key(self.current_gateways):
1451                        # This test should always succeed
1452                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1453                        if not os.path.exists(tb_dir):
1454                            try:
1455                                os.mkdir(tb_dir)
1456                            except IOError:
1457                                raise service_error(service_error.internal,
1458                                        "Cannot create %s" % tb_dir)
1459                    else:
1460                        # XXX
1461                        self.log.error("[gateways]: Ignoring gateways for " + \
1462                                "unknown testbed %s" % self.current_gateways)
1463                        self.current_gateways = None
1464                    return True
1465                else:
1466                    return False
1467            else:
1468                m = self.end_gateways.match(line)
1469                if m :
1470                    if m.group(1) != self.current_gateways:
1471                        raise service_error(service_error.internal,
1472                                "Mismatched gateway markers!?")
1473                    if self.control_gateway:
1474                        try:
1475                            cc = open("%s/%s/client.conf" %
1476                                    (self.tmpdir, self.current_gateways), 'w')
1477                            print >>cc, "ControlGateway: %s" % \
1478                                    self.control_gateway
1479                            if tbparams[self.master].has_key('smbshare'):
1480                                print >>cc, "SMBSHare: %s" % \
1481                                        tbparams[self.master]['smbshare']
1482                            print >>cc, "ProjectUser: %s" % \
1483                                    tbparams[self.master]['user']
1484                            print >>cc, "ProjectName: %s" % \
1485                                    tbparams[self.master]['project']
1486                            print >>cc, "ExperimentID: %s/%s" % \
1487                                    ( tbparams[self.master]['project'], \
1488                                    self.eid )
1489                            cc.close()
1490                        except IOError:
1491                            raise service_error(service_error.internal,
1492                                    "Error creating client config")
1493                        # XXX: This seer specific file should disappear
1494                        try:
1495                            cc = open("%s/%s/seer.conf" %
1496                                    (self.tmpdir, self.current_gateways),
1497                                    'w')
1498                            if self.current_gateways != self.master:
1499                                print >>cc, "ControlNode: %s" % \
1500                                        self.control_gateway
1501                            print >>cc, "ExperimentID: %s/%s" % \
1502                                    ( tbparams[self.master]['project'], \
1503                                    self.eid )
1504                            cc.close()
1505                        except IOError:
1506                            raise service_error(service_error.internal,
1507                                    "Error creating seer config")
1508                    else:
1509                        debug.error("[gateways]: No control gateway for %s" %\
1510                                    self.current_gateways)
1511                    self.current_gateways = None
1512                else:
1513                    dtb, myname, desthost, type = line.split(" ")
1514
1515                    if type == "control" or type == "both":
1516                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1517                                self.eid, 
1518                                tbparams[self.current_gateways]['project'],
1519                                tbparams[self.current_gateways]['domain'])
1520                    try:
1521                        active = self.gateway_conf_file(self.current_gateways,
1522                                self.master, self.eid, self.gw_pubkey_base,
1523                                self.gw_secretkey_base,
1524                                self.active_end, tbparams, dtb, myname,
1525                                desthost, type)
1526                    except IOError, e:
1527                        raise service_error(service_error.internal,
1528                                "Failed to write config file for %s" % \
1529                                        self.current_gateway)
1530           
1531                    gw_pubkey = "%s/keys/%s" % \
1532                            (self.tmpdir, self.gw_pubkey_base)
1533                    gw_secretkey = "%s/keys/%s" % \
1534                            (self.tmpdir, self.gw_secretkey_base)
1535
1536                    pkfile = "%s/%s/%s" % \
1537                            ( self.tmpdir, self.current_gateways, 
1538                                    self.gw_pubkey_base)
1539                    skfile = "%s/%s/%s" % \
1540                            ( self.tmpdir, self.current_gateways, 
1541                                    self.gw_secretkey_base)
1542
1543                    if not os.path.exists(pkfile):
1544                        try:
1545                            self.copy_file(gw_pubkey, pkfile)
1546                        except IOError:
1547                            service_error(service_error.internal,
1548                                    "Failed to copy pubkey file")
1549
1550                    if active and not os.path.exists(skfile):
1551                        try:
1552                            self.copy_file(gw_secretkey, skfile)
1553                        except IOError:
1554                            service_error(service_error.internal,
1555                                    "Failed to copy secretkey file")
1556                return True
1557
1558    class shunt_to_file:
1559        """
1560        Simple class to write data between two regexps to a file.
1561        """
1562        def __init__(self, begin, end, filename):
1563            """
1564            Begin shunting on a match of begin, stop on end, send data to
1565            filename.
1566            """
1567            self.begin = re.compile(begin)
1568            self.end = re.compile(end)
1569            self.in_shunt = False
1570            self.file = None
1571            self.filename = filename
1572
1573        def __call__(self, line):
1574            """
1575            Call this on each line in the input that may be shunted.
1576            """
1577            if not self.in_shunt:
1578                if self.begin.match(line):
1579                    self.in_shunt = True
1580                    try:
1581                        self.file = open(self.filename, "w")
1582                    except:
1583                        self.file = None
1584                        raise
1585                    return True
1586                else:
1587                    return False
1588            else:
1589                if self.end.match(line):
1590                    if self.file: 
1591                        self.file.close()
1592                        self.file = None
1593                    self.in_shunt = False
1594                else:
1595                    if self.file:
1596                        print >>self.file, line
1597                return True
1598
1599    class shunt_to_list:
1600        """
1601        Same interface as shunt_to_file.  Data collected in self.list, one list
1602        element per line.
1603        """
1604        def __init__(self, begin, end):
1605            self.begin = re.compile(begin)
1606            self.end = re.compile(end)
1607            self.in_shunt = False
1608            self.list = [ ]
1609       
1610        def __call__(self, line):
1611            if not self.in_shunt:
1612                if self.begin.match(line):
1613                    self.in_shunt = True
1614                    return True
1615                else:
1616                    return False
1617            else:
1618                if self.end.match(line):
1619                    self.in_shunt = False
1620                else:
1621                    self.list.append(line)
1622                return True
1623
1624    class shunt_to_string:
1625        """
1626        Same interface as shunt_to_file.  Data collected in self.str, all in
1627        one string.
1628        """
1629        def __init__(self, begin, end):
1630            self.begin = re.compile(begin)
1631            self.end = re.compile(end)
1632            self.in_shunt = False
1633            self.str = ""
1634       
1635        def __call__(self, line):
1636            if not self.in_shunt:
1637                if self.begin.match(line):
1638                    self.in_shunt = True
1639                    return True
1640                else:
1641                    return False
1642            else:
1643                if self.end.match(line):
1644                    self.in_shunt = False
1645                else:
1646                    self.str += line
1647                return True
1648
1649    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1650            tbparams, tmpdir, alloc_log=None):
1651        started = { }           # Testbeds where a sub-experiment started
1652                                # successfully
1653
1654        # XXX
1655        fail_soft = False
1656
1657        log = alloc_log or self.log
1658
1659        thread_pool = self.thread_pool(self.nthreads)
1660        threads = [ ]
1661
1662        for tb in [ k for k in allocated.keys() if k != master]:
1663            # Create and start a thread to start the segment, and save it to
1664            # get the return value later
1665            thread_pool.wait_for_slot()
1666            t  = self.pooled_thread(\
1667                    target=self.start_segment(log=log,
1668                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1669                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1670                    pdata=thread_pool, trace_file=self.trace_file)
1671            threads.append(t)
1672            t.start()
1673
1674        # Wait until all finish
1675        thread_pool.wait_for_all_done()
1676
1677        # If none failed, start the master
1678        failed = [ t.getName() for t in threads if not t.rv ]
1679
1680        if len(failed) == 0:
1681            starter = self.start_segment(log=log, 
1682                    keyfile=self.ssh_privkey_file, debug=self.debug)
1683            if not starter(master, eid, tbparams, tmpdir):
1684                failed.append(master)
1685
1686        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1687        # If one failed clean up, unless fail_soft is set
1688        if failed:
1689            if not fail_soft:
1690                thread_pool.clear()
1691                for tb in succeeded:
1692                    # Create and start a thread to stop the segment
1693                    thread_pool.wait_for_slot()
1694                    t  = self.pooled_thread(\
1695                            target=self.stop_segment(log=log,
1696                                keyfile=self.ssh_privkey_file,
1697                                debug=self.debug), 
1698                            args=(tb, eid, tbparams), name=tb,
1699                            pdata=thread_pool, trace_file=self.trace_file)
1700                    t.start()
1701                # Wait until all finish
1702                thread_pool.wait_for_all_done()
1703
1704                # release the allocations
1705                for tb in tbparams.keys():
1706                    self.release_access(tb, tbparams[tb]['allocID'])
1707                # Remove the placeholder
1708                self.state_lock.acquire()
1709                self.state[eid]['experimentStatus'] = 'failed'
1710                if self.state_filename: self.write_state()
1711                self.state_lock.release()
1712
1713                #raise service_error(service_error.federant,
1714                #    "Swap in failed on %s" % ",".join(failed))
1715                log.error("Swap in failed on %s" % ",".join(failed))
1716                return
1717        else:
1718            log.info("[start_segment]: Experiment %s active" % eid)
1719
1720        log.debug("[start_experiment]: removing %s" % tmpdir)
1721
1722        # Walk up tmpdir, deleting as we go
1723        for path, dirs, files in os.walk(tmpdir, topdown=False):
1724            for f in files:
1725                os.remove(os.path.join(path, f))
1726            for d in dirs:
1727                os.rmdir(os.path.join(path, d))
1728        os.rmdir(tmpdir)
1729
1730        # Insert the experiment into our state and update the disk copy
1731        self.state_lock.acquire()
1732        self.state[expid]['experimentStatus'] = 'active'
1733        self.state[eid] = self.state[expid]
1734        if self.state_filename: self.write_state()
1735        self.state_lock.release()
1736        return
1737
1738    def create_experiment(self, req, fid):
1739        """
1740        The external interface to experiment creation called from the
1741        dispatcher.
1742
1743        Creates a working directory, splits the incoming description using the
1744        splitter script and parses out the avrious subsections using the
1745        lcasses above.  Once each sub-experiment is created, use pooled threads
1746        to instantiate them and start it all up.
1747        """
1748
1749        if not self.auth.check_attribute(fid, 'create'):
1750            raise service_error(service_error.access, "Create access denied")
1751
1752        try:
1753            tmpdir = tempfile.mkdtemp(prefix="split-")
1754        except IOError:
1755            raise service_error(service_error.internal, "Cannot create tmp dir")
1756
1757        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1758        gw_secretkey_base = "fed.%s" % self.ssh_type
1759        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1760        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1761        tclfile = tmpdir + "/experiment.tcl"
1762        tbparams = { }
1763        try:
1764            access_user = self.accessdb[fid]
1765        except KeyError:
1766            raise service_error(service_error.internal,
1767                    "Access map and authorizer out of sync in " + \
1768                            "create_experiment for fedid %s"  % fid)
1769
1770        pid = "dummy"
1771        gid = "dummy"
1772        try:
1773            os.mkdir(tmpdir+"/keys")
1774        except OSError:
1775            raise service_error(service_error.internal,
1776                    "Can't make temporary dir")
1777
1778        req = req.get('CreateRequestBody', None)
1779        if not req:
1780            raise service_error(service_error.req,
1781                    "Bad request format (no CreateRequestBody)")
1782        # The tcl parser needs to read a file so put the content into that file
1783        descr=req.get('experimentdescription', None)
1784        if descr:
1785            file_content=descr.get('ns2description', None)
1786            if file_content:
1787                try:
1788                    f = open(tclfile, 'w')
1789                    f.write(file_content)
1790                    f.close()
1791                except IOError:
1792                    raise service_error(service_error.internal,
1793                            "Cannot write temp experiment description")
1794            else:
1795                raise service_error(service_error.req, 
1796                        "Only ns2descriptions supported")
1797        else:
1798            raise service_error(service_error.req, "No experiment description")
1799
1800        # Generate an ID for the experiment (slice) and a certificate that the
1801        # allocator can use to prove they own it.  We'll ship it back through
1802        # the encrypted connection.
1803        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1804
1805        if req.has_key('experimentID') and \
1806                req['experimentID'].has_key('localname'):
1807            eid = req['experimentID']['localname']
1808            self.state_lock.acquire()
1809            while (self.state.has_key(eid)):
1810                eid += random.choice(string.ascii_letters)
1811            # Initial state
1812            self.state[eid] = {
1813                    'experimentID' : \
1814                            [ { 'localname' : eid }, {'fedid': expid } ],
1815                    'experimentStatus': 'starting',
1816                    'experimentAccess': { 'X509' : expcert },
1817                    'owner': fid,
1818                    'log' : [],
1819                }
1820            self.state[expid] = self.state[eid]
1821            if self.state_filename: self.write_state()
1822            self.state_lock.release()
1823        else:
1824            eid = self.exp_stem
1825            for i in range(0,5):
1826                eid += random.choice(string.ascii_letters)
1827            self.state_lock.acquire()
1828            while (self.state.has_key(eid)):
1829                eid = self.exp_stem
1830                for i in range(0,5):
1831                    eid += random.choice(string.ascii_letters)
1832            # Initial state
1833            self.state[eid] = {
1834                    'experimentID' : \
1835                            [ { 'localname' : eid }, {'fedid': expid } ],
1836                    'experimentStatus': 'starting',
1837                    'experimentAccess': { 'X509' : expcert },
1838                    'owner': fid,
1839                    'log' : [],
1840                }
1841            self.state[expid] = self.state[eid]
1842            if self.state_filename: self.write_state()
1843            self.state_lock.release()
1844
1845        try: 
1846            # This catches exceptions to clear the placeholder if necessary
1847            try:
1848                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1849            except ValueError:
1850                raise service_error(service_error.server_config, 
1851                        "Bad key type (%s)" % self.ssh_type)
1852
1853            user = req.get('user', None)
1854            if user == None:
1855                raise service_error(service_error.req, "No user")
1856
1857            master = req.get('master', None)
1858            if not master:
1859                raise service_error(service_error.req,
1860                        "No master testbed label")
1861            export_project = req.get('exportProject', None)
1862            if not export_project:
1863                raise service_error(service_error.req, "No export project")
1864           
1865            if self.splitter_url:
1866                self.log.debug("Calling remote splitter at %s" % \
1867                        self.splitter_url)
1868                split_data = self.remote_splitter(self.splitter_url,
1869                        file_content, master)
1870            else:
1871                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1872                    str(self.muxmax), '-m', master]
1873
1874                if self.fedkit:
1875                    tclcmd.append('-k')
1876
1877                if self.gatewaykit:
1878                    tclcmd.append('-K')
1879
1880                tclcmd.extend([pid, gid, eid, tclfile])
1881
1882                self.log.debug("running local splitter %s", " ".join(tclcmd))
1883                tclparser = Popen(tclcmd, stdout=PIPE)
1884                split_data = tclparser.stdout
1885
1886            allocated = { }         # Testbeds we can access
1887            # Objects to parse the splitter output (defined above)
1888            parse_current_testbed = self.current_testbed(eid, tmpdir,
1889                    self.fedkit, self.gatewaykit)
1890            parse_allbeds = self.allbeds(self.get_access)
1891            parse_gateways = self.gateways(eid, master, tmpdir,
1892                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1893                    self.fedkit)
1894            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1895                        "^#\s+End\s+Vtopo")
1896            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1897                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1898            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1899                    "^#\s+End\s+tarfiles")
1900            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1901                    "^#\s+End\s+rpms")
1902
1903            # Working on the split data
1904            for line in split_data:
1905                line = line.rstrip()
1906                if parse_current_testbed(line, master, allocated, tbparams):
1907                    continue
1908                elif parse_allbeds(line, user, tbparams, master, export_project,
1909                        access_user):
1910                    continue
1911                elif parse_gateways(line, allocated, tbparams):
1912                    continue
1913                elif parse_vtopo(line):
1914                    continue
1915                elif parse_hostnames(line):
1916                    continue
1917                elif parse_tarfiles(line):
1918                    continue
1919                elif parse_rpms(line):
1920                    continue
1921                else:
1922                    raise service_error(service_error.internal, 
1923                            "Bad tcl parse? %s" % line)
1924            # Virtual topology and visualization
1925            vtopo = self.gentopo(parse_vtopo.str)
1926            if not vtopo:
1927                raise service_error(service_error.internal, 
1928                        "Failed to generate virtual topology")
1929
1930            vis = self.genviz(vtopo)
1931            if not vis:
1932                raise service_error(service_error.internal, 
1933                        "Failed to generate visualization")
1934
1935           
1936            # save federant information
1937            for k in allocated.keys():
1938                tbparams[k]['federant'] = {\
1939                        'name': [ { 'localname' : eid} ],\
1940                        'emulab': tbparams[k]['emulab'],\
1941                        'allocID' : tbparams[k]['allocID'],\
1942                        'master' : k == master,\
1943                    }
1944
1945            self.state_lock.acquire()
1946            self.state[eid]['vtopo'] = vtopo
1947            self.state[eid]['vis'] = vis
1948            self.state[expid]['federant'] = \
1949                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1950                        if tbparams[tb].has_key('federant') ]
1951            if self.state_filename: self.write_state()
1952            self.state_lock.release()
1953
1954            # Copy tarfiles and rpms needed at remote sites into a staging area
1955            try:
1956                if self.fedkit:
1957                    for t in self.fedkit:
1958                        parse_tarfiles.list.append(t[1])
1959                if self.gatewaykit:
1960                    for t in self.gatewaykit:
1961                        parse_tarfiles.list.append(t[1])
1962                for t in parse_tarfiles.list:
1963                    if not os.path.exists("%s/tarfiles" % tmpdir):
1964                        os.mkdir("%s/tarfiles" % tmpdir)
1965                    self.copy_file(t, "%s/tarfiles/%s" % \
1966                            (tmpdir, os.path.basename(t)))
1967                for r in parse_rpms.list:
1968                    if not os.path.exists("%s/rpms" % tmpdir):
1969                        os.mkdir("%s/rpms" % tmpdir)
1970                    self.copy_file(r, "%s/rpms/%s" % \
1971                            (tmpdir, os.path.basename(r)))
1972                # A null experiment file in case we need to create a remote
1973                # experiment from scratch
1974                f = open("%s/null.tcl" % tmpdir, "w")
1975                print >>f, """
1976set ns [new Simulator]
1977source tb_compat.tcl
1978
1979set a [$ns node]
1980
1981$ns rtproto Session
1982$ns run
1983"""
1984                f.close()
1985
1986            except IOError, e:
1987                raise service_error(service_error.internal, 
1988                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1989
1990        except service_error, e:
1991            # If something goes wrong in the parse (usually an access error)
1992            # clear the placeholder state.  From here on out the code delays
1993            # exceptions.  Failing at this point returns a fault to the remote
1994            # caller.
1995            self.state_lock.acquire()
1996            del self.state[eid]
1997            del self.state[expid]
1998            if self.state_filename: self.write_state()
1999            self.state_lock.release()
2000            raise e
2001
2002
2003        # Start the background swapper and return the starting state.  From
2004        # here on out, the state will stick around a while.
2005
2006        # Let users touch the state
2007        self.auth.set_attribute(fid, expid)
2008        self.auth.set_attribute(expid, expid)
2009
2010        # Create a logger that logs to the experiment's state object as well as
2011        # to the main log file.
2012
2013        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2014        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2015        # XXX: there should be a global one of these rather than repeating the
2016        # code.
2017        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2018                    '%d %b %y %H:%M:%S'))
2019        alloc_log.addHandler(h)
2020       
2021
2022
2023
2024
2025        # Start a thread to do the resource allocation
2026        t  = Thread(target=self.allocate_resources,
2027                args=(allocated, master, eid, expid, expcert, tbparams, 
2028                    tmpdir, alloc_log),
2029                name=eid)
2030        t.start()
2031
2032        rv = {
2033                'experimentID': [
2034                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2035                ],
2036                'experimentStatus': 'starting',
2037                'experimentAccess': { 'X509' : expcert }
2038            }
2039
2040        return rv
2041
2042    def check_experiment_access(self, fid, key):
2043        """
2044        Confirm that the fid has access to the experiment.  Though a request
2045        may be made in terms of a local name, the access attribute is always
2046        the experiment's fedid.
2047        """
2048        if not isinstance(key, fedid):
2049            self.state_lock.acquire()
2050            if self.state.has_key(key):
2051                if isinstance(self.state[key], dict):
2052                    try:
2053                        kl = [ f['fedid'] for f in \
2054                                self.state[key]['experimentID']\
2055                                    if f.has_key('fedid') ]
2056                    except KeyError:
2057                        self.state_lock.release()
2058                        raise service_error(service_error.internal, 
2059                                "No fedid for experiment %s when checking " +\
2060                                        "access(!?)" % key)
2061                    if len(kl) == 1:
2062                        key = kl[0]
2063                    else:
2064                        self.state_lock.release()
2065                        raise service_error(service_error.internal, 
2066                                "multiple fedids for experiment %s when " +\
2067                                        "checking access(!?)" % key)
2068                elif isinstance(self.state[key], str):
2069                    self.state_lock.release()
2070                    raise service_error(service_error.internal, 
2071                            ("experiment %s is placeholder.  " +\
2072                                    "Creation in progress or aborted oddly") \
2073                                    % key)
2074                else:
2075                    self.state_lock.release()
2076                    raise service_error(service_error.internal, 
2077                            "Unexpected state for %s" % key)
2078
2079            else:
2080                self.state_lock.release()
2081                raise service_error(service_error.access, "Access Denied")
2082            self.state_lock.release()
2083
2084        if self.auth.check_attribute(fid, key):
2085            return True
2086        else:
2087            raise service_error(service_error.access, "Access Denied")
2088
2089
2090
2091    def get_vtopo(self, req, fid):
2092        """
2093        Return the stored virtual topology for this experiment
2094        """
2095        rv = None
2096        state = None
2097
2098        req = req.get('VtopoRequestBody', None)
2099        if not req:
2100            raise service_error(service_error.req,
2101                    "Bad request format (no VtopoRequestBody)")
2102        exp = req.get('experiment', None)
2103        if exp:
2104            if exp.has_key('fedid'):
2105                key = exp['fedid']
2106                keytype = "fedid"
2107            elif exp.has_key('localname'):
2108                key = exp['localname']
2109                keytype = "localname"
2110            else:
2111                raise service_error(service_error.req, "Unknown lookup type")
2112        else:
2113            raise service_error(service_error.req, "No request?")
2114
2115        self.check_experiment_access(fid, key)
2116
2117        self.state_lock.acquire()
2118        if self.state.has_key(key):
2119            if self.state[key].has_key('vtopo'):
2120                rv = { 'experiment' : {keytype: key },\
2121                        'vtopo': self.state[key]['vtopo'],\
2122                    }
2123            else:
2124                state = self.state[key]['experimentStatus']
2125        self.state_lock.release()
2126
2127        if rv: return rv
2128        else: 
2129            if state:
2130                raise service_error(service_error.partial, 
2131                        "Not ready: %s" % state)
2132            else:
2133                raise service_error(service_error.req, "No such experiment")
2134
2135    def get_vis(self, req, fid):
2136        """
2137        Return the stored visualization for this experiment
2138        """
2139        rv = None
2140        state = None
2141
2142        req = req.get('VisRequestBody', None)
2143        if not req:
2144            raise service_error(service_error.req,
2145                    "Bad request format (no VisRequestBody)")
2146        exp = req.get('experiment', None)
2147        if exp:
2148            if exp.has_key('fedid'):
2149                key = exp['fedid']
2150                keytype = "fedid"
2151            elif exp.has_key('localname'):
2152                key = exp['localname']
2153                keytype = "localname"
2154            else:
2155                raise service_error(service_error.req, "Unknown lookup type")
2156        else:
2157            raise service_error(service_error.req, "No request?")
2158
2159        self.check_experiment_access(fid, key)
2160
2161        self.state_lock.acquire()
2162        if self.state.has_key(key):
2163            if self.state[key].has_key('vis'):
2164                rv =  { 'experiment' : {keytype: key },\
2165                        'vis': self.state[key]['vis'],\
2166                        }
2167            else:
2168                state = self.state[key]['experimentStatus']
2169        self.state_lock.release()
2170
2171        if rv: return rv
2172        else:
2173            if state:
2174                raise service_error(service_error.partial, 
2175                        "Not ready: %s" % state)
2176            else:
2177                raise service_error(service_error.req, "No such experiment")
2178
2179    def get_info(self, req, fid):
2180        """
2181        Return all the stored info about this experiment
2182        """
2183        rv = None
2184
2185        req = req.get('InfoRequestBody', None)
2186        if not req:
2187            raise service_error(service_error.req,
2188                    "Bad request format (no VisRequestBody)")
2189        exp = req.get('experiment', None)
2190        if exp:
2191            if exp.has_key('fedid'):
2192                key = exp['fedid']
2193                keytype = "fedid"
2194            elif exp.has_key('localname'):
2195                key = exp['localname']
2196                keytype = "localname"
2197            else:
2198                raise service_error(service_error.req, "Unknown lookup type")
2199        else:
2200            raise service_error(service_error.req, "No request?")
2201
2202        self.check_experiment_access(fid, key)
2203
2204        # The state may be massaged by the service function that called
2205        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2206        # state.
2207        self.state_lock.acquire()
2208        if self.state.has_key(key):
2209            rv = copy.deepcopy(self.state[key])
2210        self.state_lock.release()
2211
2212        if rv:
2213            # Remove the owner info (should always be there, but...)
2214            if rv.has_key('owner'): del rv['owner']
2215
2216            # Convert the log into the allocationLog parameter and remove the
2217            # log entry (with defensive programming)
2218            if rv.has_key('log'):
2219                rv['allocationLog'] = "".join(rv['log'])
2220                del rv['log']
2221            else:
2222                rv['allocationLog'] = ""
2223
2224            if rv['experimentStatus'] != 'active':
2225                if rv.has_key('federant'): del rv['federant']
2226            else:
2227                # remove the allocationID info from each federant
2228                for f in rv.get('federant', []):
2229                    if f.has_key('allocID'): del f['allocID']
2230
2231            return rv
2232        else:
2233            raise service_error(service_error.req, "No such experiment")
2234
2235
2236    def terminate_experiment(self, req, fid):
2237        """
2238        Swap this experiment out on the federants and delete the shared
2239        information
2240        """
2241        tbparams = { }
2242        req = req.get('TerminateRequestBody', None)
2243        if not req:
2244            raise service_error(service_error.req,
2245                    "Bad request format (no TerminateRequestBody)")
2246        exp = req.get('experiment', None)
2247        if exp:
2248            if exp.has_key('fedid'):
2249                key = exp['fedid']
2250                keytype = "fedid"
2251            elif exp.has_key('localname'):
2252                key = exp['localname']
2253                keytype = "localname"
2254            else:
2255                raise service_error(service_error.req, "Unknown lookup type")
2256        else:
2257            raise service_error(service_error.req, "No request?")
2258
2259        self.check_experiment_access(fid, key)
2260
2261        self.state_lock.acquire()
2262        fed_exp = self.state.get(key, None)
2263
2264        if fed_exp:
2265            # This branch of the conditional holds the lock to generate a
2266            # consistent temporary tbparams variable to deallocate experiments.
2267            # It releases the lock to do the deallocations and reacquires it to
2268            # remove the experiment state when the termination is complete.
2269
2270            # First make sure that the experiment creation is complete.
2271            status = fed_exp.get('experimentStatus', None)
2272            if status:
2273                if status == 'starting':
2274                    self.state_lock.release()
2275                    raise service_error(service_error.partial, 
2276                            'Experiment still being created')
2277            else:
2278                # No status??? trouble
2279                self.state_lock.release()
2280                raise service_error(service_error.internal,
2281                        "Experiment has no status!?")
2282
2283            ids = []
2284            #  experimentID is a list of dicts that are self-describing
2285            #  identifiers.  This finds all the fedids and localnames - the
2286            #  keys of self.state - and puts them into ids.
2287            for id in fed_exp.get('experimentID', []):
2288                if id.has_key('fedid'): ids.append(id['fedid'])
2289                if id.has_key('localname'): ids.append(id['localname'])
2290
2291            # Construct enough of the tbparams to make the stop_segment calls
2292            # work
2293            for fed in fed_exp.get('federant', []):
2294                try:
2295                    for e in fed['name']:
2296                        eid = e.get('localname', None)
2297                        if eid: break
2298                    else:
2299                        continue
2300
2301                    p = fed['emulab']['project']
2302
2303                    project = p['name']['localname']
2304                    tb = p['testbed']['localname']
2305                    user = p['user'][0]['userID']['localname']
2306
2307                    domain = fed['emulab']['domain']
2308                    host  = fed['emulab']['ops']
2309                    aid = fed['allocID']
2310                except KeyError, e:
2311                    continue
2312                tbparams[tb] = {\
2313                        'user': user,\
2314                        'domain': domain,\
2315                        'project': project,\
2316                        'host': host,\
2317                        'eid': eid,\
2318                        'aid': aid,\
2319                    }
2320            self.state_lock.release()
2321
2322            # Stop everyone.
2323            thread_pool = self.thread_pool(self.nthreads)
2324            for tb in tbparams.keys():
2325                # Create and start a thread to stop the segment
2326                thread_pool.wait_for_slot()
2327                t  = self.pooled_thread(\
2328                        target=self.stop_segment(log=self.log,
2329                            keyfile=self.ssh_privkey_file, debug=self.debug), 
2330                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2331                        pdata=thread_pool, trace_file=self.trace_file)
2332                t.start()
2333            # Wait for completions
2334            thread_pool.wait_for_all_done()
2335
2336            # release the allocations (failed experiments have done this
2337            # already)
2338            if status != 'failed':
2339                for tb in tbparams.keys():
2340                    self.release_access(tb, tbparams[tb]['aid'])
2341
2342            # Remove the terminated experiment
2343            self.state_lock.acquire()
2344            for id in ids:
2345                if self.state.has_key(id): del self.state[id]
2346
2347            if self.state_filename: self.write_state()
2348            self.state_lock.release()
2349
2350            return { 'experiment': exp }
2351        else:
2352            # Don't forget to release the lock
2353            self.state_lock.release()
2354            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.