source: fedd/federation/experiment_control.py @ 4b362df

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 4b362df was 4b362df, checked in by Ted Faber <faber@…>, 15 years ago

Segment operations into functors for migration to async creation

  • Property mode set to 100644
File size: 85.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = Lock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
70   
71    class thread_pool:
72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
148
149    class pooled_thread(Thread):
150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
188
189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
192
193    def __init__(self, config=None, auth=None):
194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
214
215        self.cert_file = config.get("experiment_control", "cert_file")
216        if self.cert_file:
217            self.cert_pwd = config.get("experiment_control", "cert_pwd")
218        else:
219            self.cert_file = config.get("globals", "cert_file")
220            self.cert_pwd = config.get("globals", "cert_pwd")
221
222        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
223                or config.get("globals", "trusted_certs")
224
225        self.exp_stem = "fed-stem"
226        self.log = logging.getLogger("fedd.experiment_control")
227        set_log_level(config, "experiment_control", self.log)
228        self.muxmax = 2
229        self.nthreads = 2
230        self.randomize_experiments = False
231
232        self.splitter = None
233        self.ssh_keygen = "/usr/bin/ssh-keygen"
234        self.ssh_identity_file = None
235
236
237        self.debug = config.getboolean("experiment_control", "create_debug")
238        self.state_filename = config.get("experiment_control", 
239                "experiment_state")
240        self.splitter_url = config.get("experiment_control", "splitter_uri")
241        self.fedkit = parse_tarfile_list(\
242                config.get("experiment_control", "fedkit"))
243        self.gatewaykit = parse_tarfile_list(\
244                config.get("experiment_control", "gatewaykit"))
245        accessdb_file = config.get("experiment_control", "accessdb")
246
247        self.ssh_pubkey_file = config.get("experiment_control", 
248                "ssh_pubkey_file")
249        self.ssh_privkey_file = config.get("experiment_control",
250                "ssh_privkey_file")
251        # NB for internal master/slave ops, not experiment setup
252        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
253        self.state = { }
254        self.state_lock = Lock()
255        self.tclsh = "/usr/local/bin/otclsh"
256        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
257                config.get("experiment_control", "tcl_splitter",
258                        "/usr/testbed/lib/ns2ir/parse.tcl")
259        mapdb_file = config.get("experiment_control", "mapdb")
260        self.trace_file = sys.stderr
261
262        self.def_expstart = \
263                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
264                "/tmp/federate";
265        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
266                "FEDDIR/hosts";
267        self.def_gwstart = \
268                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
269                "/tmp/bridge.log";
270        self.def_mgwstart = \
271                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
272                "/tmp/bridge.log";
273        self.def_gwimage = "FBSD61-TUNNEL2";
274        self.def_gwtype = "pc";
275        self.local_access = { }
276
277        if auth:
278            self.auth = auth
279        else:
280            self.log.error(\
281                    "[access]: No authorizer initialized, creating local one.")
282            auth = authorizer()
283
284
285        if self.ssh_pubkey_file:
286            try:
287                f = open(self.ssh_pubkey_file, 'r')
288                self.ssh_pubkey = f.read()
289                f.close()
290            except IOError:
291                raise service_error(service_error.internal,
292                        "Cannot read sshpubkey")
293        else:
294            raise service_error(service_error.internal, 
295                    "No SSH public key file?")
296
297        if not self.ssh_privkey_file:
298            raise service_error(service_error.internal, 
299                    "No SSH public key file?")
300
301
302        if mapdb_file:
303            self.read_mapdb(mapdb_file)
304        else:
305            self.log.warn("[experiment_control] No testbed map, using defaults")
306            self.tbmap = { 
307                    'deter':'https://users.isi.deterlab.net:23235',
308                    'emulab':'https://users.isi.deterlab.net:23236',
309                    'ucb':'https://users.isi.deterlab.net:23237',
310                    }
311
312        if accessdb_file:
313                self.read_accessdb(accessdb_file)
314        else:
315            raise service_error(service_error.internal,
316                    "No accessdb specified in config")
317
318        # Grab saved state.  OK to do this w/o locking because it's read only
319        # and only one thread should be in existence that can see self.state at
320        # this point.
321        if self.state_filename:
322            self.read_state()
323
324        # Dispatch tables
325        self.soap_services = {\
326                'Create': soap_handler('Create', self.create_experiment),
327                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
328                'Vis': soap_handler('Vis', self.get_vis),
329                'Info': soap_handler('Info', self.get_info),
330                'Terminate': soap_handler('Terminate', 
331                    self.terminate_experiment),
332        }
333
334        self.xmlrpc_services = {\
335                'Create': xmlrpc_handler('Create', self.create_experiment),
336                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
337                'Vis': xmlrpc_handler('Vis', self.get_vis),
338                'Info': xmlrpc_handler('Info', self.get_info),
339                'Terminate': xmlrpc_handler('Terminate',
340                    self.terminate_experiment),
341        }
342
343    def copy_file(self, src, dest, size=1024):
344        """
345        Exceedingly simple file copy.
346        """
347        s = open(src,'r')
348        d = open(dest, 'w')
349
350        buf = "x"
351        while buf != "":
352            buf = s.read(size)
353            d.write(buf)
354        s.close()
355        d.close()
356
357    # Call while holding self.state_lock
358    def write_state(self):
359        """
360        Write a new copy of experiment state after copying the existing state
361        to a backup.
362
363        State format is a simple pickling of the state dictionary.
364        """
365        if os.access(self.state_filename, os.W_OK):
366            self.copy_file(self.state_filename, \
367                    "%s.bak" % self.state_filename)
368        try:
369            f = open(self.state_filename, 'w')
370            pickle.dump(self.state, f)
371        except IOError, e:
372            self.log.error("Can't write file %s: %s" % \
373                    (self.state_filename, e))
374        except pickle.PicklingError, e:
375            self.log.error("Pickling problem: %s" % e)
376        except TypeError, e:
377            self.log.error("Pickling problem (TypeError): %s" % e)
378
379    # Call while holding self.state_lock
380    def read_state(self):
381        """
382        Read a new copy of experiment state.  Old state is overwritten.
383
384        State format is a simple pickling of the state dictionary.
385        """
386        try:
387            f = open(self.state_filename, "r")
388            self.state = pickle.load(f)
389            self.log.debug("[read_state]: Read state from %s" % \
390                    self.state_filename)
391        except IOError, e:
392            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
393                    % (self.state_filename, e))
394        except pickle.UnpicklingError, e:
395            self.log.warning(("[read_state]: No saved state: " + \
396                    "Unpickling failed: %s") % e)
397       
398        for k in self.state.keys():
399            try:
400                # This list should only have one element in it, but phrasing it
401                # as a for loop doesn't cost much, really.  We have to find the
402                # fedid elements anyway.
403                for eid in [ f['fedid'] \
404                        for f in self.state[k]['experimentID']\
405                            if f.has_key('fedid') ]:
406                    self.auth.set_attribute(self.state[k]['owner'], eid)
407            except KeyError, e:
408                self.log.warning("[read_state]: State ownership or identity " +\
409                        "misformatted in %s: %s" % (self.state_filename, e))
410
411
412    def read_accessdb(self, accessdb_file):
413        """
414        Read the mapping from fedids that can create experiments to their name
415        in the 3-level access namespace.  All will be asserted from this
416        testbed and can include the local username and porject that will be
417        asserted on their behalf by this fedd.  Each fedid is also added to the
418        authorization system with the "create" attribute.
419        """
420        self.accessdb = {}
421        # These are the regexps for parsing the db
422        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
423        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
424                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
425        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
426                "\s*->\s*(" + name_expr + ")\s*$")
427        lineno = 0
428
429        # Parse the mappings and store in self.authdb, a dict of
430        # fedid -> (proj, user)
431        try:
432            f = open(accessdb_file, "r")
433            for line in f:
434                lineno += 1
435                line = line.strip()
436                if len(line) == 0 or line.startswith('#'):
437                    continue
438                m = project_line.match(line)
439                if m:
440                    fid = fedid(hexstr=m.group(1))
441                    project, user = m.group(2,3)
442                    if not self.accessdb.has_key(fid):
443                        self.accessdb[fid] = []
444                    self.accessdb[fid].append((project, user))
445                    continue
446
447                m = user_line.match(line)
448                if m:
449                    fid = fedid(hexstr=m.group(1))
450                    project = None
451                    user = m.group(2)
452                    if not self.accessdb.has_key(fid):
453                        self.accessdb[fid] = []
454                    self.accessdb[fid].append((project, user))
455                    continue
456                self.log.warn("[experiment_control] Error parsing access " +\
457                        "db %s at line %d" %  (accessdb_file, lineno))
458        except IOError:
459            raise service_error(service_error.internal,
460                    "Error opening/reading %s as experiment " +\
461                            "control accessdb" %  accessdb_file)
462        f.close()
463
464        # Initialize the authorization attributes
465        for fid in self.accessdb.keys():
466            self.auth.set_attribute(fid, 'create')
467
468    def read_mapdb(self, file):
469        """
470        Read a simple colon separated list of mappings for the
471        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
472        """
473
474        self.tbmap = { }
475        lineno =0
476        try:
477            f = open(file, "r")
478            for line in f:
479                lineno += 1
480                line = line.strip()
481                if line.startswith('#') or len(line) == 0:
482                    continue
483                try:
484                    label, url = line.split(':', 1)
485                    self.tbmap[label] = url
486                except ValueError, e:
487                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
488                            "map db: %s %s" % (lineno, line, e))
489        except IOError, e:
490            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
491                    "open %s: %s" % (file, e))
492        f.close()
493
494    class emulab_segment:
495        def __init__(self, log=None, keyfile=None, debug=False):
496            self.log = log or logging.getLogger(\
497                    'fedd.experiment_control.emulab_segment')
498            self.ssh_privkey_file = keyfile
499            self.debug = debug
500            self.ssh_exec="/usr/bin/ssh"
501            self.scp_exec = "/usr/bin/scp"
502            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
503
504        def scp_file(self, file, user, host, dest=""):
505            """
506            scp a file to the remote host.  If debug is set the action is only
507            logged.
508            """
509
510            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
511                    '-o', 'StrictHostKeyChecking yes', '-i', 
512                    self.ssh_privkey_file, file, 
513                    "%s@%s:%s" % (user, host, dest)]
514            rv = 0
515
516            try:
517                dnull = open("/dev/null", "w")
518            except IOError:
519                self.log.debug("[ssh_file]: failed to open " + \
520                        "/dev/null for redirect")
521                dnull = Null
522
523            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
524            if not self.debug:
525                rv = call(scp_cmd, stdout=dnull, stderr=dnull)
526
527            return rv == 0
528
529        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
530            """
531            Run a remote command on host as user.  If debug is set, the action is
532            only logged.
533            """
534            sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
535                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
536                    (self.ssh_exec, self.ssh_privkey_file, 
537                            user, host, cmd)
538
539            try:
540                dnull = open("/dev/null", "r")
541            except IOError:
542                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
543                        "for redirect")
544                dnull = Null
545
546            self.log.debug("[ssh_cmd]: %s" % sh_str)
547            if not self.debug:
548                if dnull:
549                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
550                else:
551                    sub = Popen(sh_str, shell=True)
552                if timeout:
553                    i = 0
554                    rv = sub.poll()
555                    while i < timeout:
556                        if rv is not None: break
557                        else:
558                            time.sleep(1)
559                            rv = sub.poll()
560                            i += 1
561                    else:
562                        self.log.debug("Process exceeded runtime: %s" % sh_str)
563                        os.kill(sub.pid, signal.SIGKILL)
564                        raise self.ssh_cmd_timeout();
565                    return rv == 0
566                else:
567                    return sub.wait() == 0
568            else:
569                return True
570
571    class start_segment(emulab_segment):
572        def __init__(self, log=None, keyfile=None, debug=False):
573            experiment_control_local.emulab_segment.__init__(self,
574                    log=log, keyfile=keyfile, debug=debug)
575
576        def create_config_tree(self, src_dir, dest_dir, script):
577            """
578            Append commands to script that will create the directory hierarchy
579            on the remote federant.
580            """
581
582            if os.path.isdir(src_dir):
583                print >>script, "mkdir -p %s" % dest_dir
584                print >>script, "chmod 770 %s" % dest_dir
585
586                for f in os.listdir(src_dir):
587                    if os.path.isdir(f):
588                        self.create_config_tree("%s/%s" % (src_dir, f), 
589                                "%s/%s" % (dest_dir, f), script)
590            else:
591                self.log.debug("[create_config_tree]: Not a directory: %s" \
592                        % src_dir)
593
594        def ship_configs(self, host, user, src_dir, dest_dir):
595            """
596            Copy federant-specific configuration files to the federant.
597            """
598            for f in os.listdir(src_dir):
599                if os.path.isdir(f):
600                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
601                            "%s/%s" % (dest_dir, f)):
602                        return False
603                else:
604                    if not self.scp_file("%s/%s" % (src_dir, f), 
605                            user, host, dest_dir):
606                        return False
607            return True
608
609        def get_state(self, user, host, tb, pid, eid):
610            # command to test experiment state
611            expinfo_exec = "/usr/testbed/bin/expinfo" 
612            # Regular expressions to parse the expinfo response
613            state_re = re.compile("State:\s+(\w+)")
614            no_exp_re = re.compile("^No\s+such\s+experiment")
615            swapping_re = re.compile("^No\s+information\s+available.")
616            state = None    # Experiment state parsed from expinfo
617            # The expinfo ssh command.  Note the identity restriction to use
618            # only the identity provided in the pubkey given.
619            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
620                    'StrictHostKeyChecking yes', '-i', 
621                    self.ssh_privkey_file, "%s@%s" % (user, host), 
622                    expinfo_exec, pid, eid]
623
624            dev_null = None
625            try:
626                dev_null = open("/dev/null", "a")
627            except IOError, e:
628                self.log.error("[get_state]: can't open /dev/null: %s" %e)
629
630            if self.debug:
631                state = 'swapped'
632                rv = 0
633            else:
634                status = Popen(cmd, stdout=PIPE, stderr=dev_null)
635                for line in status.stdout:
636                    m = state_re.match(line)
637                    if m: state = m.group(1)
638                    else:
639                        for reg, st in ((no_exp_re, "none"),
640                                (swapping_re, "swapping")):
641                            m = reg.match(line)
642                            if m: state = st
643                rv = status.wait()
644
645            # If the experiment is not present the subcommand returns a
646            # non-zero return value.  If we successfully parsed a "none"
647            # outcome, ignore the return code.
648            if rv != 0 and state != 'none':
649                raise service_error(service_error.internal,
650                        "Cannot get status of segment %s:%s/%s" % \
651                                (tb, pid, eid))
652            elif state not in ('active', 'swapped', 'swapping', 'none'):
653                raise service_error(service_error.internal,
654                        "Cannot get status of segment %s:%s/%s" % \
655                                (tb, pid, eid))
656            else: return state
657
658
659        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
660            """
661            Start a sub-experiment on a federant.
662
663            Get the current state, modify or create as appropriate, ship data
664            and configs and start the experiment.  There are small ordering
665            differences based on the initial state of the sub-experiment.
666            """
667            # ops node in the federant
668            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
669            user = tbparams[tb]['user']     # federant user
670            pid = tbparams[tb]['project']   # federant project
671            # XXX
672            base_confs = ( "hosts",)
673            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
674            # Configuration directories on the remote machine
675            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
676            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
677            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
678
679            state = self.get_state(user, host, tb, pid, eid)
680
681            self.log.debug("[start_segment]: %s: %s" % (tb, state))
682            self.log.info("[start_segment]:transferring experiment to %s" % tb)
683
684            if not self.scp_file("%s/%s/%s" % \
685                    (tmpdir, tb, tclfile), user, host):
686                return False
687           
688            if state == 'none':
689                # Create a null copy of the experiment so that we capture any
690                # logs there if the modify fails.  Emulab software discards the
691                # logs from a failed startexp
692                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
693                    return False
694                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
695                timedout = False
696                try:
697                    if not self.ssh_cmd(user, host,
698                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
699                            "-e %s null.tcl") % (pid, eid), "startexp",
700                            timeout=60 * 10):
701                        return False
702                except self.ssh_cmd_timeout:
703                    timedout = True
704
705                if timedout:
706                    state = self.get_state(user, host, self.ssh_privkey_file, 
707                            tb, eid, pid)
708                    if state != "swapped":
709                        return False
710
711           
712            # Open up a temporary file to contain a script for setting up the
713            # filespace for the new experiment.
714            self.log.info("[start_segment]: creating script file")
715            try:
716                sf, scriptname = tempfile.mkstemp()
717                scriptfile = os.fdopen(sf, 'w')
718            except IOError:
719                return False
720
721            scriptbase = os.path.basename(scriptname)
722
723            # Script the filesystem changes
724            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
725            # Clear and create the tarfiles and rpm directories
726            for d in (tarfiles_dir, rpms_dir):
727                print >>scriptfile, "/bin/rm -rf %s/*" % d
728                print >>scriptfile, "mkdir -p %s" % d
729            print >>scriptfile, 'mkdir -p %s' % proj_dir
730            self.create_config_tree("%s/%s" % (tmpdir, tb),
731                    proj_dir, scriptfile)
732            if os.path.isdir("%s/tarfiles" % tmpdir):
733                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
734                        scriptfile)
735            if os.path.isdir("%s/rpms" % tmpdir):
736                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
737                        scriptfile)
738            print >>scriptfile, "rm -f %s" % scriptbase
739            scriptfile.close()
740
741            # Move the script to the remote machine
742            # XXX: could collide tempfile names on the remote host
743            if self.scp_file(scriptname, user, host, scriptbase):
744                os.remove(scriptname)
745            else:
746                return False
747
748            # Execute the script (and the script's last line deletes it)
749            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
750                return False
751
752            for f in base_confs:
753                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
754                        "%s/%s" % (proj_dir, f)):
755                    return False
756            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
757                    proj_dir):
758                return False
759            if os.path.isdir("%s/tarfiles" % tmpdir):
760                if not self.ship_configs(host, user,
761                        "%s/tarfiles" % tmpdir, tarfiles_dir):
762                    return False
763            if os.path.isdir("%s/rpms" % tmpdir):
764                if not self.ship_configs(host, user,
765                        "%s/rpms" % tmpdir, tarfiles_dir):
766                    return False
767            # Stage the new configuration (active experiments will stay swapped
768            # in now)
769            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
770            try:
771                if not self.ssh_cmd(user, host,
772                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
773                                (pid, eid, tclfile),
774                        "modexp", timeout= 60 * 10):
775                    return False
776            except self.ssh_cmd_timeout:
777                print "modexp timeout"
778                # There's really no way to see if this succeeded or failed, so
779                # if it hangs, assume the worst.
780                return False
781            # Active experiments are still swapped, this swaps the others in.
782            if state != 'active':
783                self.log.info("[start_segment]: Swapping %s in on %s" % \
784                        (eid, tb))
785                timedout = False
786                try:
787                    if not self.ssh_cmd(user, host,
788                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
789                            "swapexp", timeout=10*60):
790                        return False
791                except self.ssh_cmd_timeout:
792                    timedout = True
793               
794                # If the command was terminated, but completed successfully,
795                # report success.
796                if timedout:
797                    state = self.get_state(user, host, self.ssh_privkey_file,
798                            tb, eid, pid)
799                    self.log.debug("[start_segment]: swapin timed out (state)")
800                    return state == 'active'
801            # Everything has gone OK.
802            return True
803
804    class stop_segment(emulab_segment):
805        def __init__(self, log=None, keyfile=None, debug=False):
806            experiment_control_local.emulab_segment.__init__(self,
807                    log=log, keyfile=keyfile, debug=debug)
808
809        def __call__(self, tb, eid, tbparams):
810            """
811            Stop a sub experiment by calling swapexp on the federant
812            """
813            user = tbparams[tb]['user']
814            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
815            pid = tbparams[tb]['project']
816
817            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
818            return self.ssh_cmd(user, host,
819                    "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
820
821       
822    def generate_ssh_keys(self, dest, type="rsa" ):
823        """
824        Generate a set of keys for the gateways to use to talk.
825
826        Keys are of type type and are stored in the required dest file.
827        """
828        valid_types = ("rsa", "dsa")
829        t = type.lower();
830        if t not in valid_types: raise ValueError
831        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
832
833        try:
834            trace = open("/dev/null", "w")
835        except IOError:
836            raise service_error(service_error.internal,
837                    "Cannot open /dev/null??");
838
839        # May raise CalledProcessError
840        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
841        rv = call(cmd, stdout=trace, stderr=trace)
842        if rv != 0:
843            raise service_error(service_error.internal, 
844                    "Cannot generate nonce ssh keys.  %s return code %d" \
845                            % (self.ssh_keygen, rv))
846
847    def gentopo(self, str):
848        """
849        Generate the topology dtat structure from the splitter's XML
850        representation of it.
851
852        The topology XML looks like:
853            <experiment>
854                <nodes>
855                    <node><vname></vname><ips>ip1:ip2</ips></node>
856                </nodes>
857                <lans>
858                    <lan>
859                        <vname></vname><vnode></vnode><ip></ip>
860                        <bandwidth></bandwidth><member>node:port</member>
861                    </lan>
862                </lans>
863        """
864        class topo_parse:
865            """
866            Parse the topology XML and create the dats structure.
867            """
868            def __init__(self):
869                # Typing of the subelements for data conversion
870                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
871                self.int_subelements = ( 'bandwidth',)
872                self.float_subelements = ( 'delay',)
873                # The final data structure
874                self.nodes = [ ]
875                self.lans =  [ ]
876                self.topo = { \
877                        'node': self.nodes,\
878                        'lan' : self.lans,\
879                    }
880                self.element = { }  # Current element being created
881                self.chars = ""     # Last text seen
882
883            def end_element(self, name):
884                # After each sub element the contents is added to the current
885                # element or to the appropriate list.
886                if name == 'node':
887                    self.nodes.append(self.element)
888                    self.element = { }
889                elif name == 'lan':
890                    self.lans.append(self.element)
891                    self.element = { }
892                elif name in self.str_subelements:
893                    self.element[name] = self.chars
894                    self.chars = ""
895                elif name in self.int_subelements:
896                    self.element[name] = int(self.chars)
897                    self.chars = ""
898                elif name in self.float_subelements:
899                    self.element[name] = float(self.chars)
900                    self.chars = ""
901
902            def found_chars(self, data):
903                self.chars += data.rstrip()
904
905
906        tp = topo_parse();
907        parser = xml.parsers.expat.ParserCreate()
908        parser.EndElementHandler = tp.end_element
909        parser.CharacterDataHandler = tp.found_chars
910
911        parser.Parse(str)
912
913        return tp.topo
914       
915
916    def genviz(self, topo):
917        """
918        Generate the visualization the virtual topology
919        """
920
921        neato = "/usr/local/bin/neato"
922        # These are used to parse neato output and to create the visualization
923        # file.
924        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
925        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
926                "%s</type></node>"
927
928        try:
929            # Node names
930            nodes = [ n['vname'] for n in topo['node'] ]
931            topo_lans = topo['lan']
932        except KeyError:
933            raise service_error(service_error.internal, "Bad topology")
934
935        lans = { }
936        links = { }
937
938        # Walk through the virtual topology, organizing the connections into
939        # 2-node connections (links) and more-than-2-node connections (lans).
940        # When a lan is created, it's added to the list of nodes (there's a
941        # node in the visualization for the lan).
942        for l in topo_lans:
943            if links.has_key(l['vname']):
944                if len(links[l['vname']]) < 2:
945                    links[l['vname']].append(l['vnode'])
946                else:
947                    nodes.append(l['vname'])
948                    lans[l['vname']] = links[l['vname']]
949                    del links[l['vname']]
950                    lans[l['vname']].append(l['vnode'])
951            elif lans.has_key(l['vname']):
952                lans[l['vname']].append(l['vnode'])
953            else:
954                links[l['vname']] = [ l['vnode'] ]
955
956
957        # Open up a temporary file for dot to turn into a visualization
958        try:
959            df, dotname = tempfile.mkstemp()
960            dotfile = os.fdopen(df, 'w')
961        except IOError:
962            raise service_error(service_error.internal,
963                    "Failed to open file in genviz")
964
965        # Generate a dot/neato input file from the links, nodes and lans
966        try:
967            print >>dotfile, "graph G {"
968            for n in nodes:
969                print >>dotfile, '\t"%s"' % n
970            for l in links.keys():
971                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
972            for l in lans.keys():
973                for n in lans[l]:
974                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
975            print >>dotfile, "}"
976            dotfile.close()
977        except TypeError:
978            raise service_error(service_error.internal,
979                    "Single endpoint link in vtopo")
980        except IOError:
981            raise service_error(service_error.internal, "Cannot write dot file")
982
983        # Use dot to create a visualization
984        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
985                '-Gpack=true', dotname], stdout=PIPE)
986
987        # Translate dot to vis format
988        vis_nodes = [ ]
989        vis = { 'node': vis_nodes }
990        for line in dot.stdout:
991            m = vis_re.match(line)
992            if m:
993                vn = m.group(1)
994                vis_node = {'name': vn, \
995                        'x': float(m.group(2)),\
996                        'y' : float(m.group(3)),\
997                    }
998                if vn in links.keys() or vn in lans.keys():
999                    vis_node['type'] = 'lan'
1000                else:
1001                    vis_node['type'] = 'node'
1002                vis_nodes.append(vis_node)
1003        rv = dot.wait()
1004
1005        os.remove(dotname)
1006        if rv == 0 : return vis
1007        else: return None
1008
1009    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1010            access_user):
1011        """
1012        Get access to testbed through fedd and set the parameters for that tb
1013        """
1014        uri = self.tbmap.get(tb, None)
1015        if not uri:
1016            raise service_error(serice_error.server_config, 
1017                    "Unknown testbed: %s" % tb)
1018
1019        # currently this lumps all users into one service access group
1020        service_keys = [ a for u in user \
1021                for a in u.get('access', []) \
1022                    if a.has_key('sshPubkey')]
1023
1024        if len(service_keys) == 0:
1025            raise service_error(service_error.req, 
1026                    "Must have at least one SSH pubkey for services")
1027
1028
1029        for p, u in access_user:
1030            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1031                    "to %s") %  ((p or "None"), u, uri))
1032
1033            if p:
1034                # Request with user and project specified
1035                req = {\
1036                        'destinationTestbed' : { 'uri' : uri },
1037                        'project': { 
1038                            'name': {'localname': p},
1039                            'user': [ {'userID': { 'localname': u } } ],
1040                            },
1041                        'user':  user,
1042                        'allocID' : { 'localname': 'test' },
1043                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1044                        'serviceAccess' : service_keys
1045                    }
1046            else:
1047                # Request with only user specified
1048                req = {\
1049                        'destinationTestbed' : { 'uri' : uri },
1050                        'user':  [ {'userID': { 'localname': u } } ],
1051                        'allocID' : { 'localname': 'test' },
1052                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1053                        'serviceAccess' : service_keys
1054                    }
1055
1056            if tb == master:
1057                # NB, the export_project parameter is a dict that includes
1058                # the type
1059                req['exportProject'] = export_project
1060
1061            # node resources if any
1062            if nodes != None and len(nodes) > 0:
1063                rnodes = [ ]
1064                for n in nodes:
1065                    rn = { }
1066                    image, hw, count = n.split(":")
1067                    if image: rn['image'] = [ image ]
1068                    if hw: rn['hardware'] = [ hw ]
1069                    if count and int(count) >0 : rn['count'] = int(count)
1070                    rnodes.append(rn)
1071                req['resources']= { }
1072                req['resources']['node'] = rnodes
1073
1074            try:
1075                if self.local_access.has_key(uri):
1076                    # Local access call
1077                    req = { 'RequestAccessRequestBody' : req }
1078                    r = self.local_access[uri].RequestAccess(req, 
1079                            fedid(file=self.cert_file))
1080                    r = { 'RequestAccessResponseBody' : r }
1081                else:
1082                    r = self.call_RequestAccess(uri, req, 
1083                            self.cert_file, self.cert_pwd, self.trusted_certs)
1084            except service_error, e:
1085                if e.code == service_error.access:
1086                    self.log.debug("[get_access] Access denied")
1087                    r = None
1088                    continue
1089                else:
1090                    raise e
1091
1092            if r.has_key('RequestAccessResponseBody'):
1093                # Through to here we have a valid response, not a fault.
1094                # Access denied is a fault, so something better or worse than
1095                # access denied has happened.
1096                r = r['RequestAccessResponseBody']
1097                self.log.debug("[get_access] Access granted")
1098                break
1099            else:
1100                raise service_error(service_error.protocol,
1101                        "Bad proxy response")
1102       
1103        if not r:
1104            raise service_error(service_error.access, 
1105                    "Access denied by %s (%s)" % (tb, uri))
1106
1107        e = r['emulab']
1108        p = e['project']
1109        tbparam[tb] = { 
1110                "boss": e['boss'],
1111                "host": e['ops'],
1112                "domain": e['domain'],
1113                "fs": e['fileServer'],
1114                "eventserver": e['eventServer'],
1115                "project": unpack_id(p['name']),
1116                "emulab" : e,
1117                "allocID" : r['allocID'],
1118                }
1119        # Make the testbed name be the label the user applied
1120        p['testbed'] = {'localname': tb }
1121
1122        for u in p['user']:
1123            role = u.get('role', None)
1124            if role == 'experimentCreation':
1125                tbparam[tb]['user'] = unpack_id(u['userID'])
1126                break
1127        else:
1128            raise service_error(service_error.internal, 
1129                    "No createExperimentUser from %s" %tb)
1130
1131        # Add attributes to barameter space.  We don't allow attributes to
1132        # overlay any parameters already installed.
1133        for a in e['fedAttr']:
1134            try:
1135                if a['attribute'] and isinstance(a['attribute'], basestring)\
1136                        and not tbparam[tb].has_key(a['attribute'].lower()):
1137                    tbparam[tb][a['attribute'].lower()] = a['value']
1138            except KeyError:
1139                self.log.error("Bad attribute in response: %s" % a)
1140       
1141    def release_access(self, tb, aid):
1142        """
1143        Release access to testbed through fedd
1144        """
1145
1146        uri = self.tbmap.get(tb, None)
1147        if not uri:
1148            raise service_error(serice_error.server_config, 
1149                    "Unknown testbed: %s" % tb)
1150
1151        if self.local_access.has_key(uri):
1152            resp = self.local_access[uri].ReleaseAccess(\
1153                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1154                    fedid(file=self.cert_file))
1155            resp = { 'ReleaseAccessResponseBody': resp } 
1156        else:
1157            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1158                    self.cert_file, self.cert_pwd, self.trusted_certs)
1159
1160        # better error coding
1161
1162    def remote_splitter(self, uri, desc, master):
1163
1164        req = {
1165                'description' : { 'ns2description': desc },
1166                'master': master,
1167                'include_fedkit': bool(self.fedkit),
1168                'include_gatewaykit': bool(self.gatewaykit)
1169            }
1170
1171        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1172                self.trusted_certs)
1173
1174        if r.has_key('Ns2SplitResponseBody'):
1175            r = r['Ns2SplitResponseBody']
1176            if r.has_key('output'):
1177                return r['output'].splitlines()
1178            else:
1179                raise service_error(service_error.protocol, 
1180                        "Bad splitter response (no output)")
1181        else:
1182            raise service_error(service_error.protocol, "Bad splitter response")
1183       
1184    class current_testbed:
1185        """
1186        Object for collecting the current testbed description.  The testbed
1187        description is saved to a file with the local testbed variables
1188        subsittuted line by line.
1189        """
1190        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1191            def tar_list_to_string(tl):
1192                if tl is None: return None
1193
1194                rv = ""
1195                for t in tl:
1196                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1197                            (t[0], os.path.basename(t[1]))
1198                return rv
1199
1200
1201            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1202            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1203            self.current_testbed = None
1204            self.testbed_file = None
1205
1206            self.def_expstart = \
1207                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1208            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1209            self.def_gwstart = \
1210                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1211            self.def_mgwstart = \
1212                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1213            self.def_gwimage = "FBSD61-TUNNEL2";
1214            self.def_gwtype = "pc";
1215            self.def_mgwcmd = '# '
1216            self.def_mgwcmdparams = ''
1217            self.def_gwcmd = '# '
1218            self.def_gwcmdparams = ''
1219
1220            self.eid = eid
1221            self.tmpdir = tmpdir
1222            # Convert fedkit and gateway kit (which are lists of tuples) into a
1223            # substituition string.
1224            self.fedkit = tar_list_to_string(fedkit)
1225            self.gatewaykit = tar_list_to_string(gatewaykit)
1226
1227        def __call__(self, line, master, allocated, tbparams):
1228            # Capture testbed topology descriptions
1229            if self.current_testbed == None:
1230                m = self.begin_testbed.match(line)
1231                if m != None:
1232                    self.current_testbed = m.group(1)
1233                    if self.current_testbed == None:
1234                        raise service_error(service_error.req,
1235                                "Bad request format (unnamed testbed)")
1236                    allocated[self.current_testbed] = \
1237                            allocated.get(self.current_testbed,0) + 1
1238                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1239                    if not os.path.exists(tb_dir):
1240                        try:
1241                            os.mkdir(tb_dir)
1242                        except IOError:
1243                            raise service_error(service_error.internal,
1244                                    "Cannot create %s" % tb_dir)
1245                    try:
1246                        self.testbed_file = open("%s/%s.%s.tcl" %
1247                                (tb_dir, self.eid, self.current_testbed), 'w')
1248                    except IOError:
1249                        self.testbed_file = None
1250                    return True
1251                else: return False
1252            else:
1253                m = self.end_testbed.match(line)
1254                if m != None:
1255                    if m.group(1) != self.current_testbed:
1256                        raise service_error(service_error.internal, 
1257                                "Mismatched testbed markers!?")
1258                    if self.testbed_file != None: 
1259                        self.testbed_file.close()
1260                        self.testbed_file = None
1261                    self.current_testbed = None
1262                elif self.testbed_file:
1263                    # Substitute variables and put the line into the local
1264                    # testbed file.
1265                    gwtype = tbparams[self.current_testbed].get(\
1266                            'connectortype', self.def_gwtype)
1267                    gwimage = tbparams[self.current_testbed].get(\
1268                            'connectorimage', self.def_gwimage)
1269                    mgwstart = tbparams[self.current_testbed].get(\
1270                            'masterconnectorstartcmd', self.def_mgwstart)
1271                    mexpstart = tbparams[self.current_testbed].get(\
1272                            'masternodestartcmd', self.def_mexpstart)
1273                    gwstart = tbparams[self.current_testbed].get(\
1274                            'slaveconnectorstartcmd', self.def_gwstart)
1275                    expstart = tbparams[self.current_testbed].get(\
1276                            'slavenodestartcmd', self.def_expstart)
1277                    project = tbparams[self.current_testbed].get('project')
1278                    gwcmd = tbparams[self.current_testbed].get(\
1279                            'slaveconnectorcmd', self.def_gwcmd)
1280                    gwcmdparams = tbparams[self.current_testbed].get(\
1281                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1282                    mgwcmd = tbparams[self.current_testbed].get(\
1283                            'masterconnectorcmd', self.def_gwcmd)
1284                    mgwcmdparams = tbparams[self.current_testbed].get(\
1285                            'masterconnectorcmdparams', self.def_gwcmdparams)
1286                    line = re.sub("GWTYPE", gwtype, line)
1287                    line = re.sub("GWIMAGE", gwimage, line)
1288                    if self.current_testbed == master:
1289                        line = re.sub("GWSTART", mgwstart, line)
1290                        line = re.sub("EXPSTART", mexpstart, line)
1291                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1292                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1293                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1294                    else:
1295                        line = re.sub("GWSTART", gwstart, line)
1296                        line = re.sub("EXPSTART", expstart, line)
1297                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1298                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1299                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1300                    #These expansions contain EID and PROJDIR.  NB these are
1301                    # local fedkit and gatewaykit, which are strings.
1302                    if self.fedkit:
1303                        line = re.sub("FEDKIT", self.fedkit, line)
1304                    if self.gatewaykit:
1305                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1306                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1307                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1308                    line = re.sub("EID", self.eid, line)
1309                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1310                            (project, self.eid), line)
1311                    print >>self.testbed_file, line
1312                return True
1313
1314    class allbeds:
1315        """
1316        Process the Allbeds section.  Get access to each federant and save the
1317        parameters in tbparams
1318        """
1319        def __init__(self, get_access):
1320            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1321            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1322            self.in_allbeds = False
1323            self.get_access = get_access
1324
1325        def __call__(self, line, user, tbparams, master, export_project,
1326                access_user):
1327            # Testbed access parameters
1328            if not self.in_allbeds:
1329                if self.begin_allbeds.match(line):
1330                    self.in_allbeds = True
1331                    return True
1332                else:
1333                    return False
1334            else:
1335                if self.end_allbeds.match(line):
1336                    self.in_allbeds = False
1337                else:
1338                    nodes = line.split('|')
1339                    tb = nodes.pop(0)
1340                    self.get_access(tb, nodes, user, tbparams, master,
1341                            export_project, access_user)
1342                return True
1343
1344    class gateways:
1345        def __init__(self, eid, master, tmpdir, gw_pubkey,
1346                gw_secretkey, copy_file, fedkit):
1347            self.begin_gateways = \
1348                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1349            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1350            self.current_gateways = None
1351            self.control_gateway = None
1352            self.active_end = { }
1353
1354            self.eid = eid
1355            self.master = master
1356            self.tmpdir = tmpdir
1357            self.gw_pubkey_base = gw_pubkey
1358            self.gw_secretkey_base = gw_secretkey
1359
1360            self.copy_file = copy_file
1361            self.fedkit = fedkit
1362
1363
1364        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1365                active_end, tbparams, dtb, myname, desthost, type):
1366            """
1367            Produce a gateway configuration file from a gateways line.
1368            """
1369
1370            sproject = tbparams[gw].get('project', 'project')
1371            dproject = tbparams[dtb].get('project', 'project')
1372            sdomain = ".%s.%s%s" % (eid, sproject,
1373                    tbparams[gw].get('domain', ".example.com"))
1374            ddomain = ".%s.%s%s" % (eid, dproject,
1375                    tbparams[dtb].get('domain', ".example.com"))
1376            boss = tbparams[master].get('boss', "boss")
1377            fs = tbparams[master].get('fs', "fs")
1378            event_server = "%s%s" % \
1379                    (tbparams[gw].get('eventserver', "event_server"),
1380                            tbparams[gw].get('domain', "example.com"))
1381            remote_event_server = "%s%s" % \
1382                    (tbparams[dtb].get('eventserver', "event_server"),
1383                            tbparams[dtb].get('domain', "example.com"))
1384            seer_control = "%s%s" % \
1385                    (tbparams[gw].get('control', "control"), sdomain)
1386            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1387
1388            if self.fedkit:
1389                remote_script_dir = "/usr/local/federation/bin"
1390                local_script_dir = "/usr/local/federation/bin"
1391            else:
1392                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1393                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1394
1395            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1396            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1397            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1398
1399            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1400            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1401
1402            # translate to lower case so the `hostname` hack for specifying
1403            # configuration files works.
1404            conf_file = conf_file.lower();
1405            remote_conf_file = remote_conf_file.lower();
1406
1407            if dtb == master:
1408                active = "false"
1409            elif gw == master:
1410                active = "true"
1411            elif active_end.has_key('%s-%s' % (dtb, gw)):
1412                active = "false"
1413            else:
1414                active_end['%s-%s' % (gw, dtb)] = 1
1415                active = "true"
1416
1417            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1418            print >>gwconfig, "Active: %s" % active
1419            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1420            if tunnel_iface:
1421                print >>gwconfig, "Interface: %s" % tunnel_iface
1422            print >>gwconfig, "BossName: %s" % boss
1423            print >>gwconfig, "FsName: %s" % fs
1424            print >>gwconfig, "EventServerName: %s" % event_server
1425            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1426            print >>gwconfig, "SeerControl: %s" % seer_control
1427            print >>gwconfig, "Type: %s" % type
1428            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1429            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1430                    local_script_dir
1431            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1432            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1433            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1434                    (remote_conf_dir, remote_conf_file)
1435            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1436            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1437            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1438            gwconfig.close()
1439
1440            return active == "true"
1441
1442        def __call__(self, line, allocated, tbparams):
1443            # Process gateways
1444            if not self.current_gateways:
1445                m = self.begin_gateways.match(line)
1446                if m:
1447                    self.current_gateways = m.group(1)
1448                    if allocated.has_key(self.current_gateways):
1449                        # This test should always succeed
1450                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1451                        if not os.path.exists(tb_dir):
1452                            try:
1453                                os.mkdir(tb_dir)
1454                            except IOError:
1455                                raise service_error(service_error.internal,
1456                                        "Cannot create %s" % tb_dir)
1457                    else:
1458                        # XXX
1459                        self.log.error("[gateways]: Ignoring gateways for " + \
1460                                "unknown testbed %s" % self.current_gateways)
1461                        self.current_gateways = None
1462                    return True
1463                else:
1464                    return False
1465            else:
1466                m = self.end_gateways.match(line)
1467                if m :
1468                    if m.group(1) != self.current_gateways:
1469                        raise service_error(service_error.internal,
1470                                "Mismatched gateway markers!?")
1471                    if self.control_gateway:
1472                        try:
1473                            cc = open("%s/%s/client.conf" %
1474                                    (self.tmpdir, self.current_gateways), 'w')
1475                            print >>cc, "ControlGateway: %s" % \
1476                                    self.control_gateway
1477                            if tbparams[self.master].has_key('smbshare'):
1478                                print >>cc, "SMBSHare: %s" % \
1479                                        tbparams[self.master]['smbshare']
1480                            print >>cc, "ProjectUser: %s" % \
1481                                    tbparams[self.master]['user']
1482                            print >>cc, "ProjectName: %s" % \
1483                                    tbparams[self.master]['project']
1484                            print >>cc, "ExperimentID: %s/%s" % \
1485                                    ( tbparams[self.master]['project'], \
1486                                    self.eid )
1487                            cc.close()
1488                        except IOError:
1489                            raise service_error(service_error.internal,
1490                                    "Error creating client config")
1491                        # XXX: This seer specific file should disappear
1492                        try:
1493                            cc = open("%s/%s/seer.conf" %
1494                                    (self.tmpdir, self.current_gateways),
1495                                    'w')
1496                            if self.current_gateways != self.master:
1497                                print >>cc, "ControlNode: %s" % \
1498                                        self.control_gateway
1499                            print >>cc, "ExperimentID: %s/%s" % \
1500                                    ( tbparams[self.master]['project'], \
1501                                    self.eid )
1502                            cc.close()
1503                        except IOError:
1504                            raise service_error(service_error.internal,
1505                                    "Error creating seer config")
1506                    else:
1507                        debug.error("[gateways]: No control gateway for %s" %\
1508                                    self.current_gateways)
1509                    self.current_gateways = None
1510                else:
1511                    dtb, myname, desthost, type = line.split(" ")
1512
1513                    if type == "control" or type == "both":
1514                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1515                                self.eid, 
1516                                tbparams[self.current_gateways]['project'],
1517                                tbparams[self.current_gateways]['domain'])
1518                    try:
1519                        active = self.gateway_conf_file(self.current_gateways,
1520                                self.master, self.eid, self.gw_pubkey_base,
1521                                self.gw_secretkey_base,
1522                                self.active_end, tbparams, dtb, myname,
1523                                desthost, type)
1524                    except IOError, e:
1525                        raise service_error(service_error.internal,
1526                                "Failed to write config file for %s" % \
1527                                        self.current_gateway)
1528           
1529                    gw_pubkey = "%s/keys/%s" % \
1530                            (self.tmpdir, self.gw_pubkey_base)
1531                    gw_secretkey = "%s/keys/%s" % \
1532                            (self.tmpdir, self.gw_secretkey_base)
1533
1534                    pkfile = "%s/%s/%s" % \
1535                            ( self.tmpdir, self.current_gateways, 
1536                                    self.gw_pubkey_base)
1537                    skfile = "%s/%s/%s" % \
1538                            ( self.tmpdir, self.current_gateways, 
1539                                    self.gw_secretkey_base)
1540
1541                    if not os.path.exists(pkfile):
1542                        try:
1543                            self.copy_file(gw_pubkey, pkfile)
1544                        except IOError:
1545                            service_error(service_error.internal,
1546                                    "Failed to copy pubkey file")
1547
1548                    if active and not os.path.exists(skfile):
1549                        try:
1550                            self.copy_file(gw_secretkey, skfile)
1551                        except IOError:
1552                            service_error(service_error.internal,
1553                                    "Failed to copy secretkey file")
1554                return True
1555
1556    class shunt_to_file:
1557        """
1558        Simple class to write data between two regexps to a file.
1559        """
1560        def __init__(self, begin, end, filename):
1561            """
1562            Begin shunting on a match of begin, stop on end, send data to
1563            filename.
1564            """
1565            self.begin = re.compile(begin)
1566            self.end = re.compile(end)
1567            self.in_shunt = False
1568            self.file = None
1569            self.filename = filename
1570
1571        def __call__(self, line):
1572            """
1573            Call this on each line in the input that may be shunted.
1574            """
1575            if not self.in_shunt:
1576                if self.begin.match(line):
1577                    self.in_shunt = True
1578                    try:
1579                        self.file = open(self.filename, "w")
1580                    except:
1581                        self.file = None
1582                        raise
1583                    return True
1584                else:
1585                    return False
1586            else:
1587                if self.end.match(line):
1588                    if self.file: 
1589                        self.file.close()
1590                        self.file = None
1591                    self.in_shunt = False
1592                else:
1593                    if self.file:
1594                        print >>self.file, line
1595                return True
1596
1597    class shunt_to_list:
1598        """
1599        Same interface as shunt_to_file.  Data collected in self.list, one list
1600        element per line.
1601        """
1602        def __init__(self, begin, end):
1603            self.begin = re.compile(begin)
1604            self.end = re.compile(end)
1605            self.in_shunt = False
1606            self.list = [ ]
1607       
1608        def __call__(self, line):
1609            if not self.in_shunt:
1610                if self.begin.match(line):
1611                    self.in_shunt = True
1612                    return True
1613                else:
1614                    return False
1615            else:
1616                if self.end.match(line):
1617                    self.in_shunt = False
1618                else:
1619                    self.list.append(line)
1620                return True
1621
1622    class shunt_to_string:
1623        """
1624        Same interface as shunt_to_file.  Data collected in self.str, all in
1625        one string.
1626        """
1627        def __init__(self, begin, end):
1628            self.begin = re.compile(begin)
1629            self.end = re.compile(end)
1630            self.in_shunt = False
1631            self.str = ""
1632       
1633        def __call__(self, line):
1634            if not self.in_shunt:
1635                if self.begin.match(line):
1636                    self.in_shunt = True
1637                    return True
1638                else:
1639                    return False
1640            else:
1641                if self.end.match(line):
1642                    self.in_shunt = False
1643                else:
1644                    self.str += line
1645                return True
1646
1647    def create_experiment(self, req, fid):
1648        """
1649        The external interface to experiment creation called from the
1650        dispatcher.
1651
1652        Creates a working directory, splits the incoming description using the
1653        splitter script and parses out the avrious subsections using the
1654        lcasses above.  Once each sub-experiment is created, use pooled threads
1655        to instantiate them and start it all up.
1656        """
1657
1658        if not self.auth.check_attribute(fid, 'create'):
1659            raise service_error(service_error.access, "Create access denied")
1660
1661        try:
1662            tmpdir = tempfile.mkdtemp(prefix="split-")
1663        except IOError:
1664            raise service_error(service_error.internal, "Cannot create tmp dir")
1665
1666        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1667        gw_secretkey_base = "fed.%s" % self.ssh_type
1668        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1669        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1670        tclfile = tmpdir + "/experiment.tcl"
1671        tbparams = { }
1672        try:
1673            access_user = self.accessdb[fid]
1674        except KeyError:
1675            raise service_error(service_error.internal,
1676                    "Access map and authorizer out of sync in " + \
1677                            "create_experiment for fedid %s"  % fid)
1678
1679        pid = "dummy"
1680        gid = "dummy"
1681        # XXX
1682        fail_soft = False
1683
1684        try:
1685            os.mkdir(tmpdir+"/keys")
1686        except OSError:
1687            raise service_error(service_error.internal,
1688                    "Can't make temporary dir")
1689
1690        req = req.get('CreateRequestBody', None)
1691        if not req:
1692            raise service_error(service_error.req,
1693                    "Bad request format (no CreateRequestBody)")
1694        # The tcl parser needs to read a file so put the content into that file
1695        descr=req.get('experimentdescription', None)
1696        if descr:
1697            file_content=descr.get('ns2description', None)
1698            if file_content:
1699                try:
1700                    f = open(tclfile, 'w')
1701                    f.write(file_content)
1702                    f.close()
1703                except IOError:
1704                    raise service_error(service_error.internal,
1705                            "Cannot write temp experiment description")
1706            else:
1707                raise service_error(service_error.req, 
1708                        "Only ns2descriptions supported")
1709        else:
1710            raise service_error(service_error.req, "No experiment description")
1711
1712        if req.has_key('experimentID') and \
1713                req['experimentID'].has_key('localname'):
1714            eid = req['experimentID']['localname']
1715            self.state_lock.acquire()
1716            while (self.state.has_key(eid)):
1717                eid += random.choice(string.ascii_letters)
1718            # To avoid another thread picking this localname
1719            self.state[eid] = "placeholder"
1720            self.state_lock.release()
1721        else:
1722            eid = self.exp_stem
1723            for i in range(0,5):
1724                eid += random.choice(string.ascii_letters)
1725            self.state_lock.acquire()
1726            while (self.state.has_key(eid)):
1727                eid = self.exp_stem
1728                for i in range(0,5):
1729                    eid += random.choice(string.ascii_letters)
1730            # To avoid another thread picking this localname
1731            self.state[eid] = "placeholder"
1732            self.state_lock.release()
1733
1734        try: 
1735            # This catches exceptions to clear the placeholder if necessary
1736            try:
1737                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1738            except ValueError:
1739                raise service_error(service_error.server_config, 
1740                        "Bad key type (%s)" % self.ssh_type)
1741
1742            user = req.get('user', None)
1743            if user == None:
1744                raise service_error(service_error.req, "No user")
1745
1746            master = req.get('master', None)
1747            if not master:
1748                raise service_error(service_error.req,
1749                        "No master testbed label")
1750            export_project = req.get('exportProject', None)
1751            if not export_project:
1752                raise service_error(service_error.req, "No export project")
1753           
1754            if self.splitter_url:
1755                self.log.debug("Calling remote splitter at %s" % \
1756                        self.splitter_url)
1757                split_data = self.remote_splitter(self.splitter_url,
1758                        file_content, master)
1759            else:
1760                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1761                    str(self.muxmax), '-m', master]
1762
1763                if self.fedkit:
1764                    tclcmd.append('-k')
1765
1766                if self.gatewaykit:
1767                    tclcmd.append('-K')
1768
1769                tclcmd.extend([pid, gid, eid, tclfile])
1770
1771                self.log.debug("running local splitter %s", " ".join(tclcmd))
1772                tclparser = Popen(tclcmd, stdout=PIPE)
1773                split_data = tclparser.stdout
1774
1775            allocated = { }         # Testbeds we can access
1776            started = { }           # Testbeds where a sub-experiment started
1777                                    # successfully
1778
1779            # Objects to parse the splitter output (defined above)
1780            parse_current_testbed = self.current_testbed(eid, tmpdir,
1781                    self.fedkit, self.gatewaykit)
1782            parse_allbeds = self.allbeds(self.get_access)
1783            parse_gateways = self.gateways(eid, master, tmpdir,
1784                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1785                    self.fedkit)
1786            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1787                        "^#\s+End\s+Vtopo")
1788            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1789                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1790            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1791                    "^#\s+End\s+tarfiles")
1792            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1793                    "^#\s+End\s+rpms")
1794
1795            # Working on the split data
1796            for line in split_data:
1797                line = line.rstrip()
1798                if parse_current_testbed(line, master, allocated, tbparams):
1799                    continue
1800                elif parse_allbeds(line, user, tbparams, master, export_project,
1801                        access_user):
1802                    continue
1803                elif parse_gateways(line, allocated, tbparams):
1804                    continue
1805                elif parse_vtopo(line):
1806                    continue
1807                elif parse_hostnames(line):
1808                    continue
1809                elif parse_tarfiles(line):
1810                    continue
1811                elif parse_rpms(line):
1812                    continue
1813                else:
1814                    raise service_error(service_error.internal, 
1815                            "Bad tcl parse? %s" % line)
1816            # Virtual topology and visualization
1817            vtopo = self.gentopo(parse_vtopo.str)
1818            if not vtopo:
1819                raise service_error(service_error.internal, 
1820                        "Failed to generate virtual topology")
1821
1822            vis = self.genviz(vtopo)
1823            if not vis:
1824                raise service_error(service_error.internal, 
1825                        "Failed to generate visualization")
1826           
1827            # save federant information
1828            for k in allocated.keys():
1829                tbparams[k]['federant'] = {\
1830                        'name': [ { 'localname' : eid} ],\
1831                        'emulab': tbparams[k]['emulab'],\
1832                        'allocID' : tbparams[k]['allocID'],\
1833                        'master' : k == master,\
1834                    }
1835
1836
1837            # Copy tarfiles and rpms needed at remote sites into a staging area
1838            try:
1839                if self.fedkit:
1840                    for t in self.fedkit:
1841                        parse_tarfiles.list.append(t[1])
1842                if self.gatewaykit:
1843                    for t in self.gatewaykit:
1844                        parse_tarfiles.list.append(t[1])
1845                for t in parse_tarfiles.list:
1846                    if not os.path.exists("%s/tarfiles" % tmpdir):
1847                        os.mkdir("%s/tarfiles" % tmpdir)
1848                    self.copy_file(t, "%s/tarfiles/%s" % \
1849                            (tmpdir, os.path.basename(t)))
1850                for r in parse_rpms.list:
1851                    if not os.path.exists("%s/rpms" % tmpdir):
1852                        os.mkdir("%s/rpms" % tmpdir)
1853                    self.copy_file(r, "%s/rpms/%s" % \
1854                            (tmpdir, os.path.basename(r)))
1855                # A null experiment file in case we need to create a remote
1856                # experiment from scratch
1857                f = open("%s/null.tcl" % tmpdir, "w")
1858                print >>f, """
1859set ns [new Simulator]
1860source tb_compat.tcl
1861
1862set a [$ns node]
1863
1864$ns rtproto Session
1865$ns run
1866"""
1867                f.close()
1868
1869            except IOError, e:
1870                raise service_error(service_error.internal, 
1871                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1872
1873        except service_error, e:
1874            # If something goes wrong in the parse (usually an access error)
1875            # clear the placeholder state.  From here on out the code delays
1876            # exceptions.
1877            self.state_lock.acquire()
1878            del self.state[eid]
1879            self.state_lock.release()
1880            raise e
1881
1882        thread_pool = self.thread_pool(self.nthreads)
1883        threads = [ ]
1884
1885        for tb in [ k for k in allocated.keys() if k != master]:
1886            # Create and start a thread to start the segment, and save it to
1887            # get the return value later
1888            thread_pool.wait_for_slot()
1889            t  = self.pooled_thread(\
1890                    target=self.start_segment(log=self.log,
1891                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1892                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1893                    pdata=thread_pool, trace_file=self.trace_file)
1894            threads.append(t)
1895            t.start()
1896
1897        # Wait until all finish
1898        thread_pool.wait_for_all_done()
1899
1900        # If none failed, start the master
1901        failed = [ t.getName() for t in threads if not t.rv ]
1902
1903        if len(failed) == 0:
1904            starter = self.start_segment(log=self.log, 
1905                    keyfile=self.ssh_privkey_file, debug=self.debug)
1906            if not starter(master, eid, tbparams, tmpdir):
1907                failed.append(master)
1908
1909        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1910        # If one failed clean up, unless fail_soft is set
1911        if failed:
1912            if not fail_soft:
1913                thread_pool.clear()
1914                for tb in succeeded:
1915                    # Create and start a thread to stop the segment
1916                    thread_pool.wait_for_slot()
1917                    t  = self.pooled_thread(\
1918                            target=self.stop_segment(log=self.log,
1919                                keyfile=self.ssh_privkey_file,
1920                                debug=self.debug), 
1921                            args=(tb, eid, tbparams), name=tb,
1922                            pdata=thread_pool, trace_file=self.trace_file)
1923                    t.start()
1924                # Wait until all finish
1925                thread_pool.wait_for_all_done()
1926
1927                # release the allocations
1928                for tb in tbparams.keys():
1929                    self.release_access(tb, tbparams[tb]['allocID'])
1930                # Remove the placeholder
1931                self.state_lock.acquire()
1932                del self.state[eid]
1933                self.state_lock.release()
1934
1935                raise service_error(service_error.federant,
1936                    "Swap in failed on %s" % ",".join(failed))
1937        else:
1938            self.log.info("[start_segment]: Experiment %s started" % eid)
1939
1940        # Generate an ID for the experiment (slice) and a certificate that the
1941        # allocator can use to prove they own it.  We'll ship it back through
1942        # the encrypted connection.
1943        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1944
1945        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1946
1947        # Walk up tmpdir, deleting as we go
1948        for path, dirs, files in os.walk(tmpdir, topdown=False):
1949            for f in files:
1950                os.remove(os.path.join(path, f))
1951            for d in dirs:
1952                os.rmdir(os.path.join(path, d))
1953        os.rmdir(tmpdir)
1954
1955        # The deepcopy prevents the allocation ID and other binaries from being
1956        # translated into other formats
1957        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1958                for tb in tbparams.keys() \
1959                    if tbparams[tb].has_key('federant') ],\
1960                    'vtopo': vtopo,\
1961                    'vis' : vis,
1962                    'experimentID' : [\
1963                            { 'fedid': copy.copy(expid) }, \
1964                            { 'localname': eid },\
1965                        ],\
1966                    'experimentAccess': { 'X509' : expcert },\
1967                }
1968        # remove the allocationID info from each federant
1969        for f in resp['federant']:
1970            if f.has_key('allocID'): del f['allocID']
1971
1972        # Insert the experiment into our state and update the disk copy
1973        self.state_lock.acquire()
1974        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1975                for tb in tbparams.keys() \
1976                    if tbparams[tb].has_key('federant') ],\
1977                    'vtopo': vtopo,\
1978                    'vis' : vis,
1979                    'owner': fid,
1980                    'experimentID' : [\
1981                            { 'fedid': expid }, { 'localname': eid },\
1982                        ],\
1983                }
1984        self.state[eid] = self.state[expid]
1985        if self.state_filename: self.write_state()
1986        self.state_lock.release()
1987
1988        self.auth.set_attribute(fid, expid)
1989        self.auth.set_attribute(expid, expid)
1990
1991        if not failed:
1992            return resp
1993        else:
1994            raise service_error(service_error.partial, \
1995                    "Partial swap in on %s" % ",".join(succeeded))
1996
1997    def check_experiment_access(self, fid, key):
1998        """
1999        Confirm that the fid has access to the experiment.  Though a request
2000        may be made in terms of a local name, the access attribute is always
2001        the experiment's fedid.
2002        """
2003        if not isinstance(key, fedid):
2004            self.state_lock.acquire()
2005            if self.state.has_key(key):
2006                if isinstance(self.state[key], dict):
2007                    try:
2008                        kl = [ f['fedid'] for f in \
2009                                self.state[key]['experimentID']\
2010                                    if f.has_key('fedid') ]
2011                    except KeyError:
2012                        self.state_lock.release()
2013                        raise service_error(service_error.internal, 
2014                                "No fedid for experiment %s when checking " +\
2015                                        "access(!?)" % key)
2016                    if len(kl) == 1:
2017                        key = kl[0]
2018                    else:
2019                        self.state_lock.release()
2020                        raise service_error(service_error.internal, 
2021                                "multiple fedids for experiment %s when " +\
2022                                        "checking access(!?)" % key)
2023                elif isinstance(self.state[key], str):
2024                    self.state_lock.release()
2025                    raise service_error(service_error.internal, 
2026                            ("experiment %s is placeholder.  " +\
2027                                    "Creation in progress or aborted oddly") \
2028                                    % key)
2029                else:
2030                    self.state_lock.release()
2031                    raise service_error(service_error.internal, 
2032                            "Unexpected state for %s" % key)
2033
2034            else:
2035                self.state_lock.release()
2036                raise service_error(service_error.access, "Access Denied")
2037            self.state_lock.release()
2038
2039        if self.auth.check_attribute(fid, key):
2040            return True
2041        else:
2042            raise service_error(service_error.access, "Access Denied")
2043
2044
2045
2046    def get_vtopo(self, req, fid):
2047        """
2048        Return the stored virtual topology for this experiment
2049        """
2050        rv = None
2051
2052        req = req.get('VtopoRequestBody', None)
2053        if not req:
2054            raise service_error(service_error.req,
2055                    "Bad request format (no VtopoRequestBody)")
2056        exp = req.get('experiment', None)
2057        if exp:
2058            if exp.has_key('fedid'):
2059                key = exp['fedid']
2060                keytype = "fedid"
2061            elif exp.has_key('localname'):
2062                key = exp['localname']
2063                keytype = "localname"
2064            else:
2065                raise service_error(service_error.req, "Unknown lookup type")
2066        else:
2067            raise service_error(service_error.req, "No request?")
2068
2069        self.check_experiment_access(fid, key)
2070
2071        self.state_lock.acquire()
2072        if self.state.has_key(key):
2073            rv = { 'experiment' : {keytype: key },\
2074                    'vtopo': self.state[key]['vtopo'],\
2075                }
2076        self.state_lock.release()
2077
2078        if rv: return rv
2079        else: raise service_error(service_error.req, "No such experiment")
2080
2081    def get_vis(self, req, fid):
2082        """
2083        Return the stored visualization for this experiment
2084        """
2085        rv = None
2086
2087        req = req.get('VisRequestBody', None)
2088        if not req:
2089            raise service_error(service_error.req,
2090                    "Bad request format (no VisRequestBody)")
2091        exp = req.get('experiment', None)
2092        if exp:
2093            if exp.has_key('fedid'):
2094                key = exp['fedid']
2095                keytype = "fedid"
2096            elif exp.has_key('localname'):
2097                key = exp['localname']
2098                keytype = "localname"
2099            else:
2100                raise service_error(service_error.req, "Unknown lookup type")
2101        else:
2102            raise service_error(service_error.req, "No request?")
2103
2104        self.check_experiment_access(fid, key)
2105
2106        self.state_lock.acquire()
2107        if self.state.has_key(key):
2108            rv =  { 'experiment' : {keytype: key },\
2109                    'vis': self.state[key]['vis'],\
2110                    }
2111        self.state_lock.release()
2112
2113        if rv: return rv
2114        else: raise service_error(service_error.req, "No such experiment")
2115
2116    def get_info(self, req, fid):
2117        """
2118        Return all the stored info about this experiment
2119        """
2120        rv = None
2121
2122        req = req.get('InfoRequestBody', None)
2123        if not req:
2124            raise service_error(service_error.req,
2125                    "Bad request format (no VisRequestBody)")
2126        exp = req.get('experiment', None)
2127        if exp:
2128            if exp.has_key('fedid'):
2129                key = exp['fedid']
2130                keytype = "fedid"
2131            elif exp.has_key('localname'):
2132                key = exp['localname']
2133                keytype = "localname"
2134            else:
2135                raise service_error(service_error.req, "Unknown lookup type")
2136        else:
2137            raise service_error(service_error.req, "No request?")
2138
2139        self.check_experiment_access(fid, key)
2140
2141        # The state may be massaged by the service function that called
2142        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2143        # state.
2144        self.state_lock.acquire()
2145        if self.state.has_key(key):
2146            rv = copy.deepcopy(self.state[key])
2147        self.state_lock.release()
2148        # Remove the owner info
2149        del rv['owner']
2150        # remove the allocationID info from each federant
2151        for f in rv['federant']:
2152            if f.has_key('allocID'): del f['allocID']
2153
2154        if rv: return rv
2155        else: raise service_error(service_error.req, "No such experiment")
2156
2157
2158    def terminate_experiment(self, req, fid):
2159        """
2160        Swap this experiment out on the federants and delete the shared
2161        information
2162        """
2163        tbparams = { }
2164        req = req.get('TerminateRequestBody', None)
2165        if not req:
2166            raise service_error(service_error.req,
2167                    "Bad request format (no TerminateRequestBody)")
2168        exp = req.get('experiment', None)
2169        if exp:
2170            if exp.has_key('fedid'):
2171                key = exp['fedid']
2172                keytype = "fedid"
2173            elif exp.has_key('localname'):
2174                key = exp['localname']
2175                keytype = "localname"
2176            else:
2177                raise service_error(service_error.req, "Unknown lookup type")
2178        else:
2179            raise service_error(service_error.req, "No request?")
2180
2181        self.check_experiment_access(fid, key)
2182
2183        self.state_lock.acquire()
2184        fed_exp = self.state.get(key, None)
2185
2186        if fed_exp:
2187            # This branch of the conditional holds the lock to generate a
2188            # consistent temporary tbparams variable to deallocate experiments.
2189            # It releases the lock to do the deallocations and reacquires it to
2190            # remove the experiment state when the termination is complete.
2191            ids = []
2192            #  experimentID is a list of dicts that are self-describing
2193            #  identifiers.  This finds all the fedids and localnames - the
2194            #  keys of self.state - and puts them into ids.
2195            for id in fed_exp.get('experimentID', []):
2196                if id.has_key('fedid'): ids.append(id['fedid'])
2197                if id.has_key('localname'): ids.append(id['localname'])
2198
2199            # Construct enough of the tbparams to make the stop_segment calls
2200            # work
2201            for fed in fed_exp['federant']:
2202                try:
2203                    for e in fed['name']:
2204                        eid = e.get('localname', None)
2205                        if eid: break
2206                    else:
2207                        continue
2208
2209                    p = fed['emulab']['project']
2210
2211                    project = p['name']['localname']
2212                    tb = p['testbed']['localname']
2213                    user = p['user'][0]['userID']['localname']
2214
2215                    domain = fed['emulab']['domain']
2216                    host  = fed['emulab']['ops']
2217                    aid = fed['allocID']
2218                except KeyError, e:
2219                    continue
2220                tbparams[tb] = {\
2221                        'user': user,\
2222                        'domain': domain,\
2223                        'project': project,\
2224                        'host': host,\
2225                        'eid': eid,\
2226                        'aid': aid,\
2227                    }
2228            self.state_lock.release()
2229
2230            # Stop everyone.
2231            thread_pool = self.thread_pool(self.nthreads)
2232            for tb in tbparams.keys():
2233                # Create and start a thread to stop the segment
2234                thread_pool.wait_for_slot()
2235                t  = self.pooled_thread(target=self.stop_segment, 
2236                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2237                        pdata=thread_pool, trace_file=self.trace_file)
2238                t.start()
2239            # Wait for completions
2240            thread_pool.wait_for_all_done()
2241
2242            # release the allocations
2243            for tb in tbparams.keys():
2244                self.release_access(tb, tbparams[tb]['aid'])
2245
2246            # Remove the terminated experiment
2247            self.state_lock.acquire()
2248            for id in ids:
2249                if self.state.has_key(id): del self.state[id]
2250
2251            if self.state_filename: self.write_state()
2252            self.state_lock.release()
2253
2254            return { 'experiment': exp }
2255        else:
2256            # Don't forget to release the lock
2257            self.state_lock.release()
2258            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.