source: fedd/federation/experiment_control.py @ 55c074c

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 55c074c was 55c074c, checked in by Ted Faber <faber@…>, 15 years ago

untested scripting of fs mods

  • Property mode set to 100644
File size: 83.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self, nthreads):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53            self.nthreads = nthreads
54
55        def acquire(self):
56            """
57            Get the pool's lock.
58            """
59            self.changed.acquire()
60
61        def release(self):
62            """
63            Release the pool's lock.
64            """
65            self.changed.release()
66
67        def wait(self, timeout = None):
68            """
69            Wait for a pool thread to start or stop.
70            """
71            self.changed.wait(timeout)
72
73        def start(self):
74            """
75            Called by a pool thread to report starting.
76            """
77            self.changed.acquire()
78            self.started += 1
79            self.changed.notifyAll()
80            self.changed.release()
81
82        def terminate(self):
83            """
84            Called by a pool thread to report finishing.
85            """
86            self.changed.acquire()
87            self.terminated += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def clear(self):
92            """
93            Clear all pool data.
94            """
95            self.changed.acquire()
96            self.started = 0
97            self.terminated =0
98            self.changed.notifyAll()
99            self.changed.release()
100
101        def wait_for_slot(self):
102            """
103            Wait until we have a free slot to start another pooled thread
104            """
105            self.acquire()
106            while self.started - self.terminated >= self.nthreads:
107                self.wait()
108            self.release()
109
110        def wait_for_all_done(self):
111            """
112            Wait until all active threads finish (and at least one has started)
113            """
114            self.acquire()
115            while self.started == 0 or self.started > self.terminated:
116                self.wait()
117            self.release()
118
119    class pooled_thread(Thread):
120        """
121        One of a set of threads dedicated to a specific task.  Uses the
122        thread_pool class above for coordination.
123        """
124        def __init__(self, group=None, target=None, name=None, args=(), 
125                kwargs={}, pdata=None, trace_file=None):
126            Thread.__init__(self, group, target, name, args, kwargs)
127            self.rv = None          # Return value of the ops in this thread
128            self.exception = None   # Exception that terminated this thread
129            self.target=target      # Target function to run on start()
130            self.args = args        # Args to pass to target
131            self.kwargs = kwargs    # Additional kw args
132            self.pdata = pdata      # thread_pool for this class
133            # Logger for this thread
134            self.log = logging.getLogger("fedd.experiment_control")
135       
136        def run(self):
137            """
138            Emulate Thread.run, except add pool data manipulation and error
139            logging.
140            """
141            if self.pdata:
142                self.pdata.start()
143
144            if self.target:
145                try:
146                    self.rv = self.target(*self.args, **self.kwargs)
147                except service_error, s:
148                    self.exception = s
149                    self.log.error("Thread exception: %s %s" % \
150                            (s.code_string(), s.desc))
151                except:
152                    self.exception = sys.exc_info()[1]
153                    self.log.error(("Unexpected thread exception: %s" +\
154                            "Trace %s") % (self.exception,\
155                                traceback.format_exc()))
156            if self.pdata:
157                self.pdata.terminate()
158
159    call_RequestAccess = service_caller('RequestAccess')
160    call_ReleaseAccess = service_caller('ReleaseAccess')
161    call_Ns2Split = service_caller('Ns2Split')
162
163    def __init__(self, config=None, auth=None):
164        """
165        Intialize the various attributes, most from the config object
166        """
167
168        def parse_tarfile_list(tf):
169            """
170            Parse a tarfile list from the configuration.  This is a set of
171            paths and tarfiles separated by spaces.
172            """
173            rv = [ ]
174            if tf is not None:
175                tl = tf.split()
176                while len(tl) > 1:
177                    p, t = tl[0:2]
178                    del tl[0:2]
179                    rv.append((p, t))
180            return rv
181
182        self.thread_with_rv = experiment_control_local.pooled_thread
183        self.thread_pool = experiment_control_local.thread_pool
184
185        self.cert_file = config.get("experiment_control", "cert_file")
186        if self.cert_file:
187            self.cert_pwd = config.get("experiment_control", "cert_pwd")
188        else:
189            self.cert_file = config.get("globals", "cert_file")
190            self.cert_pwd = config.get("globals", "cert_pwd")
191
192        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
193                or config.get("globals", "trusted_certs")
194
195        self.exp_stem = "fed-stem"
196        self.log = logging.getLogger("fedd.experiment_control")
197        set_log_level(config, "experiment_control", self.log)
198        self.muxmax = 2
199        self.nthreads = 2
200        self.randomize_experiments = False
201
202        self.scp_exec = "/usr/bin/scp"
203        self.splitter = None
204        self.ssh_exec="/usr/bin/ssh"
205        self.ssh_keygen = "/usr/bin/ssh-keygen"
206        self.ssh_identity_file = None
207
208
209        self.debug = config.getboolean("experiment_control", "create_debug")
210        self.state_filename = config.get("experiment_control", 
211                "experiment_state")
212        self.splitter_url = config.get("experiment_control", "splitter_uri")
213        self.fedkit = parse_tarfile_list(\
214                config.get("experiment_control", "fedkit"))
215        self.gatewaykit = parse_tarfile_list(\
216                config.get("experiment_control", "gatewaykit"))
217        accessdb_file = config.get("experiment_control", "accessdb")
218
219        self.ssh_pubkey_file = config.get("experiment_control", 
220                "ssh_pubkey_file")
221        self.ssh_privkey_file = config.get("experiment_control",
222                "ssh_privkey_file")
223        # NB for internal master/slave ops, not experiment setup
224        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
225        self.state = { }
226        self.state_lock = Lock()
227        self.tclsh = "/usr/local/bin/otclsh"
228        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
229                config.get("experiment_control", "tcl_splitter",
230                        "/usr/testbed/lib/ns2ir/parse.tcl")
231        mapdb_file = config.get("experiment_control", "mapdb")
232        self.trace_file = sys.stderr
233
234        self.def_expstart = \
235                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
236                "/tmp/federate";
237        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
238                "FEDDIR/hosts";
239        self.def_gwstart = \
240                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
241                "/tmp/bridge.log";
242        self.def_mgwstart = \
243                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
244                "/tmp/bridge.log";
245        self.def_gwimage = "FBSD61-TUNNEL2";
246        self.def_gwtype = "pc";
247        self.local_access = { }
248
249        if auth:
250            self.auth = auth
251        else:
252            self.log.error(\
253                    "[access]: No authorizer initialized, creating local one.")
254            auth = authorizer()
255
256
257        if self.ssh_pubkey_file:
258            try:
259                f = open(self.ssh_pubkey_file, 'r')
260                self.ssh_pubkey = f.read()
261                f.close()
262            except IOError:
263                raise service_error(service_error.internal,
264                        "Cannot read sshpubkey")
265        else:
266            raise service_error(service_error.internal, 
267                    "No SSH public key file?")
268
269        if not self.ssh_privkey_file:
270            raise service_error(service_error.internal, 
271                    "No SSH public key file?")
272
273
274        if mapdb_file:
275            self.read_mapdb(mapdb_file)
276        else:
277            self.log.warn("[experiment_control] No testbed map, using defaults")
278            self.tbmap = { 
279                    'deter':'https://users.isi.deterlab.net:23235',
280                    'emulab':'https://users.isi.deterlab.net:23236',
281                    'ucb':'https://users.isi.deterlab.net:23237',
282                    }
283
284        if accessdb_file:
285                self.read_accessdb(accessdb_file)
286        else:
287            raise service_error(service_error.internal,
288                    "No accessdb specified in config")
289
290        # Grab saved state.  OK to do this w/o locking because it's read only
291        # and only one thread should be in existence that can see self.state at
292        # this point.
293        if self.state_filename:
294            self.read_state()
295
296        # Dispatch tables
297        self.soap_services = {\
298                'Create': soap_handler('Create', self.create_experiment),
299                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
300                'Vis': soap_handler('Vis', self.get_vis),
301                'Info': soap_handler('Info', self.get_info),
302                'Terminate': soap_handler('Terminate', 
303                    self.terminate_experiment),
304        }
305
306        self.xmlrpc_services = {\
307                'Create': xmlrpc_handler('Create', self.create_experiment),
308                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
309                'Vis': xmlrpc_handler('Vis', self.get_vis),
310                'Info': xmlrpc_handler('Info', self.get_info),
311                'Terminate': xmlrpc_handler('Terminate',
312                    self.terminate_experiment),
313        }
314
315    def copy_file(self, src, dest, size=1024):
316        """
317        Exceedingly simple file copy.
318        """
319        s = open(src,'r')
320        d = open(dest, 'w')
321
322        buf = "x"
323        while buf != "":
324            buf = s.read(size)
325            d.write(buf)
326        s.close()
327        d.close()
328
329    # Call while holding self.state_lock
330    def write_state(self):
331        """
332        Write a new copy of experiment state after copying the existing state
333        to a backup.
334
335        State format is a simple pickling of the state dictionary.
336        """
337        if os.access(self.state_filename, os.W_OK):
338            self.copy_file(self.state_filename, \
339                    "%s.bak" % self.state_filename)
340        try:
341            f = open(self.state_filename, 'w')
342            pickle.dump(self.state, f)
343        except IOError, e:
344            self.log.error("Can't write file %s: %s" % \
345                    (self.state_filename, e))
346        except pickle.PicklingError, e:
347            self.log.error("Pickling problem: %s" % e)
348        except TypeError, e:
349            self.log.error("Pickling problem (TypeError): %s" % e)
350
351    # Call while holding self.state_lock
352    def read_state(self):
353        """
354        Read a new copy of experiment state.  Old state is overwritten.
355
356        State format is a simple pickling of the state dictionary.
357        """
358        try:
359            f = open(self.state_filename, "r")
360            self.state = pickle.load(f)
361            self.log.debug("[read_state]: Read state from %s" % \
362                    self.state_filename)
363        except IOError, e:
364            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
365                    % (self.state_filename, e))
366        except pickle.UnpicklingError, e:
367            self.log.warning(("[read_state]: No saved state: " + \
368                    "Unpickling failed: %s") % e)
369       
370        for k in self.state.keys():
371            try:
372                # This list should only have one element in it, but phrasing it
373                # as a for loop doesn't cost much, really.  We have to find the
374                # fedid elements anyway.
375                for eid in [ f['fedid'] \
376                        for f in self.state[k]['experimentID']\
377                            if f.has_key('fedid') ]:
378                    self.auth.set_attribute(self.state[k]['owner'], eid)
379            except KeyError, e:
380                self.log.warning("[read_state]: State ownership or identity " +\
381                        "misformatted in %s: %s" % (self.state_filename, e))
382
383
384    def read_accessdb(self, accessdb_file):
385        """
386        Read the mapping from fedids that can create experiments to their name
387        in the 3-level access namespace.  All will be asserted from this
388        testbed and can include the local username and porject that will be
389        asserted on their behalf by this fedd.  Each fedid is also added to the
390        authorization system with the "create" attribute.
391        """
392        self.accessdb = {}
393        # These are the regexps for parsing the db
394        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
395        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
396                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
397        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
398                "\s*->\s*(" + name_expr + ")\s*$")
399        lineno = 0
400
401        # Parse the mappings and store in self.authdb, a dict of
402        # fedid -> (proj, user)
403        try:
404            f = open(accessdb_file, "r")
405            for line in f:
406                lineno += 1
407                line = line.strip()
408                if len(line) == 0 or line.startswith('#'):
409                    continue
410                m = project_line.match(line)
411                if m:
412                    fid = fedid(hexstr=m.group(1))
413                    project, user = m.group(2,3)
414                    if not self.accessdb.has_key(fid):
415                        self.accessdb[fid] = []
416                    self.accessdb[fid].append((project, user))
417                    continue
418
419                m = user_line.match(line)
420                if m:
421                    fid = fedid(hexstr=m.group(1))
422                    project = None
423                    user = m.group(2)
424                    if not self.accessdb.has_key(fid):
425                        self.accessdb[fid] = []
426                    self.accessdb[fid].append((project, user))
427                    continue
428                self.log.warn("[experiment_control] Error parsing access " +\
429                        "db %s at line %d" %  (accessdb_file, lineno))
430        except IOError:
431            raise service_error(service_error.internal,
432                    "Error opening/reading %s as experiment " +\
433                            "control accessdb" %  accessdb_file)
434        f.close()
435
436        # Initialize the authorization attributes
437        for fid in self.accessdb.keys():
438            self.auth.set_attribute(fid, 'create')
439
440    def read_mapdb(self, file):
441        """
442        Read a simple colon separated list of mappings for the
443        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
444        """
445
446        self.tbmap = { }
447        lineno =0
448        try:
449            f = open(file, "r")
450            for line in f:
451                lineno += 1
452                line = line.strip()
453                if line.startswith('#') or len(line) == 0:
454                    continue
455                try:
456                    label, url = line.split(':', 1)
457                    self.tbmap[label] = url
458                except ValueError, e:
459                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
460                            "map db: %s %s" % (lineno, line, e))
461        except IOError, e:
462            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
463                    "open %s: %s" % (file, e))
464        f.close()
465
466    def scp_file(self, file, user, host, dest=""):
467        """
468        scp a file to the remote host.  If debug is set the action is only
469        logged.
470        """
471
472        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
473                '-o', 'StrictHostKeyChecking yes', '-i', 
474                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
475        rv = 0
476
477        try:
478            dnull = open("/dev/null", "w")
479        except IOError:
480            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
481            dnull = Null
482
483        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
484        if not self.debug:
485            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
486
487        return rv == 0
488
489    def ssh_cmd(self, user, host, cmd, wname=None):
490        """
491        Run a remote command on host as user.  If debug is set, the action is
492        only logged.
493        """
494        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
495                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
496                (self.ssh_exec, self.ssh_privkey_file, 
497                        user, host, cmd)
498
499        try:
500            dnull = open("/dev/null", "r")
501        except IOError:
502            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
503            dnull = Null
504
505        self.log.debug("[ssh_cmd]: %s" % sh_str)
506        if not self.debug:
507            if dnull:
508                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
509            else:
510                sub = Popen(sh_str, shell=True)
511            return sub.wait() == 0
512        else:
513            return True
514
515    def ship_configs(self, host, user, src_dir, dest_dir):
516        """
517        Copy federant-specific configuration files to the federant.
518        """
519        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
520            return False
521        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
522            return False
523
524        for f in os.listdir(src_dir):
525            if os.path.isdir(f):
526                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
527                        "%s/%s" % (dest_dir, f)):
528                    return False
529            else:
530                if not self.scp_file("%s/%s" % (src_dir, f), 
531                        user, host, dest_dir):
532                    return False
533        return True
534
535    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
536        """
537        Start a sub-experiment on a federant.
538
539        Get the current state, modify or create as appropriate, ship data and
540        configs and start the experiment.  There are small ordering differences
541        based on the initial state of the sub-experiment.
542        """
543        # ops node in the federant
544        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
545        user = tbparams[tb]['user']     # federant user
546        pid = tbparams[tb]['project']   # federant project
547        # XXX
548        base_confs = ( "hosts",)
549        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
550        # command to test experiment state
551        expinfo_exec = "/usr/testbed/bin/expinfo" 
552        # Configuration directories on the remote machine
553        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
554        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
555        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
556        # Regular expressions to parse the expinfo response
557        state_re = re.compile("State:\s+(\w+)")
558        no_exp_re = re.compile("^No\s+such\s+experiment")
559        state = None    # Experiment state parsed from expinfo
560        # The expinfo ssh command.  Note the identity restriction to use only
561        # the identity provided in the pubkey given.
562        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
563                'StrictHostKeyChecking yes', '-i', 
564                self.ssh_privkey_file, "%s@%s" % (user, host), 
565                expinfo_exec, pid, eid]
566
567        # Get status
568        self.log.debug("[start_segment]: %s"% " ".join(cmd))
569        dev_null = None
570        try:
571            dev_null = open("/dev/null", "a")
572        except IOError, e:
573            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
574
575        if self.debug:
576            state = 'swapped'
577            rv = 0
578        else:
579            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
580            for line in status.stdout:
581                m = state_re.match(line)
582                if m: state = m.group(1)
583                else:
584                    m = no_exp_re.match(line)
585                    if m: state = "none"
586            rv = status.wait()
587
588        # If the experiment is not present the subcommand returns a non-zero
589        # return value.  If we successfully parsed a "none" outcome, ignore the
590        # return code.
591        if rv != 0 and state != 'none':
592            raise service_error(service_error.internal,
593                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
594
595        if state not in ('active', 'swapped', 'none'):
596            self.log.debug("[start_segment]:unknown state %s" % state)
597            return False
598
599        self.log.debug("[start_segment]: %s: %s" % (tb, state))
600        self.log.info("[start_segment]:transferring experiment to %s" % tb)
601
602        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
603            return False
604       
605        if state == 'none':
606            # Create a null copy of the experiment so that we capture any logs
607            # there if the modify fails.  Emulab software discards the logs
608            # from a failed startexp
609            if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
610                return False
611            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
612            if not self.ssh_cmd(user, host,
613                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s null.tcl" \
614                            % (pid, eid), "startexp"):
615                return False
616       
617        # Open up a temporary file to contain a script for setting up the
618        # filespace for the new experiment.
619        self.log.info("[start_segment]: creating script file")
620        try:
621            sf, scriptname = tempfile.mkstemp()
622            scriptfile = os.fdopen(df, 'w')
623        except IOError:
624            return False
625
626        # Script the filesystem changes
627        print >>scriptfile, "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir
628        # Clear and create the tarfiles and rpm directories
629        for d in (tarfiles_dir, rpms_dir):
630            print >>scriptfile, "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d
631            print >>scriptfile, "mkdir -p %s" % d
632        print >>scriptfile, "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir
633        print >>scriptfile, "rm %s" % scriptfile
634        close(scriptdir)
635
636        # Move the script to the remote machine
637        # XXX: could collide tempfile names on the remote host
638        if self.scp_file(scriptname, user, host, scriptname):
639            os.remove(scriptname)
640        else:
641            return False
642
643        # Execute the script (and the script's last line deletes it)
644        if not self.ssh_cmd(user, host, "sh -x %s" % scriptname):
645            return False
646
647        for f in base_confs:
648            if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
649                    "%s/%s" % (proj_dir, f)):
650                return False
651        if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
652                proj_dir):
653            return False
654        if os.path.isdir("%s/tarfiles" % tmpdir):
655            if not self.ship_configs(host, user,
656                    "%s/tarfiles" % tmpdir, tarfiles_dir):
657                return False
658        if os.path.isdir("%s/rpms" % tmpdir):
659            if not self.ship_configs(host, user,
660                    "%s/rpms" % tmpdir, tarfiles_dir):
661                return False
662        # Stage the new configuration (active experiments will stay swapped in
663        # now)
664        self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
665        if not self.ssh_cmd(user, host,
666                "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
667                        (pid, eid, tclfile),
668                "modexp"):
669            return False
670        # Active experiments are still swapped, this swaps the others in.
671        if state != 'active':
672            self.log.info("[start_segment]: Swapping %s in on %s" % \
673                    (eid, tb))
674            if not self.ssh_cmd(user, host,
675                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
676                    "swapexp"):
677                return False
678        return True
679
680    def stop_segment(self, tb, eid, tbparams):
681        """
682        Stop a sub experiment by calling swapexp on the federant
683        """
684        user = tbparams[tb]['user']
685        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
686        pid = tbparams[tb]['project']
687
688        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
689        return self.ssh_cmd(user, host,
690                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
691
692       
693    def generate_ssh_keys(self, dest, type="rsa" ):
694        """
695        Generate a set of keys for the gateways to use to talk.
696
697        Keys are of type type and are stored in the required dest file.
698        """
699        valid_types = ("rsa", "dsa")
700        t = type.lower();
701        if t not in valid_types: raise ValueError
702        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
703
704        try:
705            trace = open("/dev/null", "w")
706        except IOError:
707            raise service_error(service_error.internal,
708                    "Cannot open /dev/null??");
709
710        # May raise CalledProcessError
711        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
712        rv = call(cmd, stdout=trace, stderr=trace)
713        if rv != 0:
714            raise service_error(service_error.internal, 
715                    "Cannot generate nonce ssh keys.  %s return code %d" \
716                            % (self.ssh_keygen, rv))
717
718    def gentopo(self, str):
719        """
720        Generate the topology dtat structure from the splitter's XML
721        representation of it.
722
723        The topology XML looks like:
724            <experiment>
725                <nodes>
726                    <node><vname></vname><ips>ip1:ip2</ips></node>
727                </nodes>
728                <lans>
729                    <lan>
730                        <vname></vname><vnode></vnode><ip></ip>
731                        <bandwidth></bandwidth><member>node:port</member>
732                    </lan>
733                </lans>
734        """
735        class topo_parse:
736            """
737            Parse the topology XML and create the dats structure.
738            """
739            def __init__(self):
740                # Typing of the subelements for data conversion
741                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
742                self.int_subelements = ( 'bandwidth',)
743                self.float_subelements = ( 'delay',)
744                # The final data structure
745                self.nodes = [ ]
746                self.lans =  [ ]
747                self.topo = { \
748                        'node': self.nodes,\
749                        'lan' : self.lans,\
750                    }
751                self.element = { }  # Current element being created
752                self.chars = ""     # Last text seen
753
754            def end_element(self, name):
755                # After each sub element the contents is added to the current
756                # element or to the appropriate list.
757                if name == 'node':
758                    self.nodes.append(self.element)
759                    self.element = { }
760                elif name == 'lan':
761                    self.lans.append(self.element)
762                    self.element = { }
763                elif name in self.str_subelements:
764                    self.element[name] = self.chars
765                    self.chars = ""
766                elif name in self.int_subelements:
767                    self.element[name] = int(self.chars)
768                    self.chars = ""
769                elif name in self.float_subelements:
770                    self.element[name] = float(self.chars)
771                    self.chars = ""
772
773            def found_chars(self, data):
774                self.chars += data.rstrip()
775
776
777        tp = topo_parse();
778        parser = xml.parsers.expat.ParserCreate()
779        parser.EndElementHandler = tp.end_element
780        parser.CharacterDataHandler = tp.found_chars
781
782        parser.Parse(str)
783
784        return tp.topo
785       
786
787    def genviz(self, topo):
788        """
789        Generate the visualization the virtual topology
790        """
791
792        neato = "/usr/local/bin/neato"
793        # These are used to parse neato output and to create the visualization
794        # file.
795        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
796        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
797                "%s</type></node>"
798
799        try:
800            # Node names
801            nodes = [ n['vname'] for n in topo['node'] ]
802            topo_lans = topo['lan']
803        except KeyError:
804            raise service_error(service_error.internal, "Bad topology")
805
806        lans = { }
807        links = { }
808
809        # Walk through the virtual topology, organizing the connections into
810        # 2-node connections (links) and more-than-2-node connections (lans).
811        # When a lan is created, it's added to the list of nodes (there's a
812        # node in the visualization for the lan).
813        for l in topo_lans:
814            if links.has_key(l['vname']):
815                if len(links[l['vname']]) < 2:
816                    links[l['vname']].append(l['vnode'])
817                else:
818                    nodes.append(l['vname'])
819                    lans[l['vname']] = links[l['vname']]
820                    del links[l['vname']]
821                    lans[l['vname']].append(l['vnode'])
822            elif lans.has_key(l['vname']):
823                lans[l['vname']].append(l['vnode'])
824            else:
825                links[l['vname']] = [ l['vnode'] ]
826
827
828        # Open up a temporary file for dot to turn into a visualization
829        try:
830            df, dotname = tempfile.mkstemp()
831            dotfile = os.fdopen(df, 'w')
832        except IOError:
833            raise service_error(service_error.internal,
834                    "Failed to open file in genviz")
835
836        # Generate a dot/neato input file from the links, nodes and lans
837        try:
838            print >>dotfile, "graph G {"
839            for n in nodes:
840                print >>dotfile, '\t"%s"' % n
841            for l in links.keys():
842                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
843            for l in lans.keys():
844                for n in lans[l]:
845                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
846            print >>dotfile, "}"
847            dotfile.close()
848        except TypeError:
849            raise service_error(service_error.internal,
850                    "Single endpoint link in vtopo")
851        except IOError:
852            raise service_error(service_error.internal, "Cannot write dot file")
853
854        # Use dot to create a visualization
855        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
856                '-Gpack=true', dotname], stdout=PIPE)
857
858        # Translate dot to vis format
859        vis_nodes = [ ]
860        vis = { 'node': vis_nodes }
861        for line in dot.stdout:
862            m = vis_re.match(line)
863            if m:
864                vn = m.group(1)
865                vis_node = {'name': vn, \
866                        'x': float(m.group(2)),\
867                        'y' : float(m.group(3)),\
868                    }
869                if vn in links.keys() or vn in lans.keys():
870                    vis_node['type'] = 'lan'
871                else:
872                    vis_node['type'] = 'node'
873                vis_nodes.append(vis_node)
874        rv = dot.wait()
875
876        os.remove(dotname)
877        if rv == 0 : return vis
878        else: return None
879
880    def get_access(self, tb, nodes, user, tbparam, master, export_project,
881            access_user):
882        """
883        Get access to testbed through fedd and set the parameters for that tb
884        """
885        uri = self.tbmap.get(tb, None)
886        if not uri:
887            raise service_error(serice_error.server_config, 
888                    "Unknown testbed: %s" % tb)
889
890        # currently this lumps all users into one service access group
891        service_keys = [ a for u in user \
892                for a in u.get('access', []) \
893                    if a.has_key('sshPubkey')]
894
895        if len(service_keys) == 0:
896            raise service_error(service_error.req, 
897                    "Must have at least one SSH pubkey for services")
898
899
900        for p, u in access_user:
901            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
902                    "to %s") %  ((p or "None"), u, uri))
903
904            if p:
905                # Request with user and project specified
906                req = {\
907                        'destinationTestbed' : { 'uri' : uri },
908                        'project': { 
909                            'name': {'localname': p},
910                            'user': [ {'userID': { 'localname': u } } ],
911                            },
912                        'user':  user,
913                        'allocID' : { 'localname': 'test' },
914                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
915                        'serviceAccess' : service_keys
916                    }
917            else:
918                # Request with only user specified
919                req = {\
920                        'destinationTestbed' : { 'uri' : uri },
921                        'user':  [ {'userID': { 'localname': u } } ],
922                        'allocID' : { 'localname': 'test' },
923                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
924                        'serviceAccess' : service_keys
925                    }
926
927            if tb == master:
928                # NB, the export_project parameter is a dict that includes
929                # the type
930                req['exportProject'] = export_project
931
932            # node resources if any
933            if nodes != None and len(nodes) > 0:
934                rnodes = [ ]
935                for n in nodes:
936                    rn = { }
937                    image, hw, count = n.split(":")
938                    if image: rn['image'] = [ image ]
939                    if hw: rn['hardware'] = [ hw ]
940                    if count and int(count) >0 : rn['count'] = int(count)
941                    rnodes.append(rn)
942                req['resources']= { }
943                req['resources']['node'] = rnodes
944
945            try:
946                if self.local_access.has_key(uri):
947                    # Local access call
948                    req = { 'RequestAccessRequestBody' : req }
949                    r = self.local_access[uri].RequestAccess(req, 
950                            fedid(file=self.cert_file))
951                    r = { 'RequestAccessResponseBody' : r }
952                else:
953                    r = self.call_RequestAccess(uri, req, 
954                            self.cert_file, self.cert_pwd, self.trusted_certs)
955            except service_error, e:
956                if e.code == service_error.access:
957                    self.log.debug("[get_access] Access denied")
958                    r = None
959                    continue
960                else:
961                    raise e
962
963            if r.has_key('RequestAccessResponseBody'):
964                # Through to here we have a valid response, not a fault.
965                # Access denied is a fault, so something better or worse than
966                # access denied has happened.
967                r = r['RequestAccessResponseBody']
968                self.log.debug("[get_access] Access granted")
969                break
970            else:
971                raise service_error(service_error.protocol,
972                        "Bad proxy response")
973       
974        if not r:
975            raise service_error(service_error.access, 
976                    "Access denied by %s (%s)" % (tb, uri))
977
978        e = r['emulab']
979        p = e['project']
980        tbparam[tb] = { 
981                "boss": e['boss'],
982                "host": e['ops'],
983                "domain": e['domain'],
984                "fs": e['fileServer'],
985                "eventserver": e['eventServer'],
986                "project": unpack_id(p['name']),
987                "emulab" : e,
988                "allocID" : r['allocID'],
989                }
990        # Make the testbed name be the label the user applied
991        p['testbed'] = {'localname': tb }
992
993        for u in p['user']:
994            role = u.get('role', None)
995            if role == 'experimentCreation':
996                tbparam[tb]['user'] = unpack_id(u['userID'])
997                break
998        else:
999            raise service_error(service_error.internal, 
1000                    "No createExperimentUser from %s" %tb)
1001
1002        # Add attributes to barameter space.  We don't allow attributes to
1003        # overlay any parameters already installed.
1004        for a in e['fedAttr']:
1005            try:
1006                if a['attribute'] and isinstance(a['attribute'], basestring)\
1007                        and not tbparam[tb].has_key(a['attribute'].lower()):
1008                    tbparam[tb][a['attribute'].lower()] = a['value']
1009            except KeyError:
1010                self.log.error("Bad attribute in response: %s" % a)
1011       
1012    def release_access(self, tb, aid):
1013        """
1014        Release access to testbed through fedd
1015        """
1016
1017        uri = self.tbmap.get(tb, None)
1018        if not uri:
1019            raise service_error(serice_error.server_config, 
1020                    "Unknown testbed: %s" % tb)
1021
1022        if self.local_access.has_key(uri):
1023            resp = self.local_access[uri].ReleaseAccess(\
1024                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1025                    fedid(file=self.cert_file))
1026            resp = { 'ReleaseAccessResponseBody': resp } 
1027        else:
1028            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1029                    self.cert_file, self.cert_pwd, self.trusted_certs)
1030
1031        # better error coding
1032
1033    def remote_splitter(self, uri, desc, master):
1034
1035        req = {
1036                'description' : { 'ns2description': desc },
1037                'master': master,
1038                'include_fedkit': bool(self.fedkit),
1039                'include_gatewaykit': bool(self.gatewaykit)
1040            }
1041
1042        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1043                self.trusted_certs)
1044
1045        if r.has_key('Ns2SplitResponseBody'):
1046            r = r['Ns2SplitResponseBody']
1047            if r.has_key('output'):
1048                return r['output'].splitlines()
1049            else:
1050                raise service_error(service_error.protocol, 
1051                        "Bad splitter response (no output)")
1052        else:
1053            raise service_error(service_error.protocol, "Bad splitter response")
1054       
1055    class current_testbed:
1056        """
1057        Object for collecting the current testbed description.  The testbed
1058        description is saved to a file with the local testbed variables
1059        subsittuted line by line.
1060        """
1061        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1062            def tar_list_to_string(tl):
1063                if tl is None: return None
1064
1065                rv = ""
1066                for t in tl:
1067                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1068                            (t[0], os.path.basename(t[1]))
1069                return rv
1070
1071
1072            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1073            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1074            self.current_testbed = None
1075            self.testbed_file = None
1076
1077            self.def_expstart = \
1078                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1079            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1080            self.def_gwstart = \
1081                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1082            self.def_mgwstart = \
1083                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1084            self.def_gwimage = "FBSD61-TUNNEL2";
1085            self.def_gwtype = "pc";
1086            self.def_mgwcmd = '# '
1087            self.def_mgwcmdparams = ''
1088            self.def_gwcmd = '# '
1089            self.def_gwcmdparams = ''
1090
1091            self.eid = eid
1092            self.tmpdir = tmpdir
1093            # Convert fedkit and gateway kit (which are lists of tuples) into a
1094            # substituition string.
1095            self.fedkit = tar_list_to_string(fedkit)
1096            self.gatewaykit = tar_list_to_string(gatewaykit)
1097
1098        def __call__(self, line, master, allocated, tbparams):
1099            # Capture testbed topology descriptions
1100            if self.current_testbed == None:
1101                m = self.begin_testbed.match(line)
1102                if m != None:
1103                    self.current_testbed = m.group(1)
1104                    if self.current_testbed == None:
1105                        raise service_error(service_error.req,
1106                                "Bad request format (unnamed testbed)")
1107                    allocated[self.current_testbed] = \
1108                            allocated.get(self.current_testbed,0) + 1
1109                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1110                    if not os.path.exists(tb_dir):
1111                        try:
1112                            os.mkdir(tb_dir)
1113                        except IOError:
1114                            raise service_error(service_error.internal,
1115                                    "Cannot create %s" % tb_dir)
1116                    try:
1117                        self.testbed_file = open("%s/%s.%s.tcl" %
1118                                (tb_dir, self.eid, self.current_testbed), 'w')
1119                    except IOError:
1120                        self.testbed_file = None
1121                    return True
1122                else: return False
1123            else:
1124                m = self.end_testbed.match(line)
1125                if m != None:
1126                    if m.group(1) != self.current_testbed:
1127                        raise service_error(service_error.internal, 
1128                                "Mismatched testbed markers!?")
1129                    if self.testbed_file != None: 
1130                        self.testbed_file.close()
1131                        self.testbed_file = None
1132                    self.current_testbed = None
1133                elif self.testbed_file:
1134                    # Substitute variables and put the line into the local
1135                    # testbed file.
1136                    gwtype = tbparams[self.current_testbed].get(\
1137                            'connectortype', self.def_gwtype)
1138                    gwimage = tbparams[self.current_testbed].get(\
1139                            'connectorimage', self.def_gwimage)
1140                    mgwstart = tbparams[self.current_testbed].get(\
1141                            'masterconnectorstartcmd', self.def_mgwstart)
1142                    mexpstart = tbparams[self.current_testbed].get(\
1143                            'masternodestartcmd', self.def_mexpstart)
1144                    gwstart = tbparams[self.current_testbed].get(\
1145                            'slaveconnectorstartcmd', self.def_gwstart)
1146                    expstart = tbparams[self.current_testbed].get(\
1147                            'slavenodestartcmd', self.def_expstart)
1148                    project = tbparams[self.current_testbed].get('project')
1149                    gwcmd = tbparams[self.current_testbed].get(\
1150                            'slaveconnectorcmd', self.def_gwcmd)
1151                    gwcmdparams = tbparams[self.current_testbed].get(\
1152                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1153                    mgwcmd = tbparams[self.current_testbed].get(\
1154                            'masterconnectorcmd', self.def_gwcmd)
1155                    mgwcmdparams = tbparams[self.current_testbed].get(\
1156                            'masterconnectorcmdparams', self.def_gwcmdparams)
1157                    line = re.sub("GWTYPE", gwtype, line)
1158                    line = re.sub("GWIMAGE", gwimage, line)
1159                    if self.current_testbed == master:
1160                        line = re.sub("GWSTART", mgwstart, line)
1161                        line = re.sub("EXPSTART", mexpstart, line)
1162                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1163                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1164                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1165                    else:
1166                        line = re.sub("GWSTART", gwstart, line)
1167                        line = re.sub("EXPSTART", expstart, line)
1168                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1169                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1170                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1171                    #These expansions contain EID and PROJDIR.  NB these are
1172                    # local fedkit and gatewaykit, which are strings.
1173                    if self.fedkit:
1174                        line = re.sub("FEDKIT", self.fedkit, line)
1175                    if self.gatewaykit:
1176                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1177                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1178                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1179                    line = re.sub("EID", self.eid, line)
1180                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1181                            (project, self.eid), line)
1182                    print >>self.testbed_file, line
1183                return True
1184
1185    class allbeds:
1186        """
1187        Process the Allbeds section.  Get access to each federant and save the
1188        parameters in tbparams
1189        """
1190        def __init__(self, get_access):
1191            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1192            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1193            self.in_allbeds = False
1194            self.get_access = get_access
1195
1196        def __call__(self, line, user, tbparams, master, export_project,
1197                access_user):
1198            # Testbed access parameters
1199            if not self.in_allbeds:
1200                if self.begin_allbeds.match(line):
1201                    self.in_allbeds = True
1202                    return True
1203                else:
1204                    return False
1205            else:
1206                if self.end_allbeds.match(line):
1207                    self.in_allbeds = False
1208                else:
1209                    nodes = line.split('|')
1210                    tb = nodes.pop(0)
1211                    self.get_access(tb, nodes, user, tbparams, master,
1212                            export_project, access_user)
1213                return True
1214
1215    class gateways:
1216        def __init__(self, eid, master, tmpdir, gw_pubkey,
1217                gw_secretkey, copy_file, fedkit):
1218            self.begin_gateways = \
1219                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1220            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1221            self.current_gateways = None
1222            self.control_gateway = None
1223            self.active_end = { }
1224
1225            self.eid = eid
1226            self.master = master
1227            self.tmpdir = tmpdir
1228            self.gw_pubkey_base = gw_pubkey
1229            self.gw_secretkey_base = gw_secretkey
1230
1231            self.copy_file = copy_file
1232            self.fedkit = fedkit
1233
1234
1235        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1236                active_end, tbparams, dtb, myname, desthost, type):
1237            """
1238            Produce a gateway configuration file from a gateways line.
1239            """
1240
1241            sproject = tbparams[gw].get('project', 'project')
1242            dproject = tbparams[dtb].get('project', 'project')
1243            sdomain = ".%s.%s%s" % (eid, sproject,
1244                    tbparams[gw].get('domain', ".example.com"))
1245            ddomain = ".%s.%s%s" % (eid, dproject,
1246                    tbparams[dtb].get('domain', ".example.com"))
1247            boss = tbparams[master].get('boss', "boss")
1248            fs = tbparams[master].get('fs', "fs")
1249            event_server = "%s%s" % \
1250                    (tbparams[gw].get('eventserver', "event_server"),
1251                            tbparams[gw].get('domain', "example.com"))
1252            remote_event_server = "%s%s" % \
1253                    (tbparams[dtb].get('eventserver', "event_server"),
1254                            tbparams[dtb].get('domain', "example.com"))
1255            seer_control = "%s%s" % \
1256                    (tbparams[gw].get('control', "control"), sdomain)
1257            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1258
1259            if self.fedkit:
1260                remote_script_dir = "/usr/local/federation/bin"
1261                local_script_dir = "/usr/local/federation/bin"
1262            else:
1263                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1264                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1265
1266            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1267            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1268            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1269
1270            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1271            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1272
1273            # translate to lower case so the `hostname` hack for specifying
1274            # configuration files works.
1275            conf_file = conf_file.lower();
1276            remote_conf_file = remote_conf_file.lower();
1277
1278            if dtb == master:
1279                active = "false"
1280            elif gw == master:
1281                active = "true"
1282            elif active_end.has_key('%s-%s' % (dtb, gw)):
1283                active = "false"
1284            else:
1285                active_end['%s-%s' % (gw, dtb)] = 1
1286                active = "true"
1287
1288            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1289            print >>gwconfig, "Active: %s" % active
1290            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1291            if tunnel_iface:
1292                print >>gwconfig, "Interface: %s" % tunnel_iface
1293            print >>gwconfig, "BossName: %s" % boss
1294            print >>gwconfig, "FsName: %s" % fs
1295            print >>gwconfig, "EventServerName: %s" % event_server
1296            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1297            print >>gwconfig, "SeerControl: %s" % seer_control
1298            print >>gwconfig, "Type: %s" % type
1299            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1300            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1301                    local_script_dir
1302            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1303            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1304            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1305                    (remote_conf_dir, remote_conf_file)
1306            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1307            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1308            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1309            gwconfig.close()
1310
1311            return active == "true"
1312
1313        def __call__(self, line, allocated, tbparams):
1314            # Process gateways
1315            if not self.current_gateways:
1316                m = self.begin_gateways.match(line)
1317                if m:
1318                    self.current_gateways = m.group(1)
1319                    if allocated.has_key(self.current_gateways):
1320                        # This test should always succeed
1321                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1322                        if not os.path.exists(tb_dir):
1323                            try:
1324                                os.mkdir(tb_dir)
1325                            except IOError:
1326                                raise service_error(service_error.internal,
1327                                        "Cannot create %s" % tb_dir)
1328                    else:
1329                        # XXX
1330                        self.log.error("[gateways]: Ignoring gateways for " + \
1331                                "unknown testbed %s" % self.current_gateways)
1332                        self.current_gateways = None
1333                    return True
1334                else:
1335                    return False
1336            else:
1337                m = self.end_gateways.match(line)
1338                if m :
1339                    if m.group(1) != self.current_gateways:
1340                        raise service_error(service_error.internal,
1341                                "Mismatched gateway markers!?")
1342                    if self.control_gateway:
1343                        try:
1344                            cc = open("%s/%s/client.conf" %
1345                                    (self.tmpdir, self.current_gateways), 'w')
1346                            print >>cc, "ControlGateway: %s" % \
1347                                    self.control_gateway
1348                            if tbparams[self.master].has_key('smbshare'):
1349                                print >>cc, "SMBSHare: %s" % \
1350                                        tbparams[self.master]['smbshare']
1351                            print >>cc, "ProjectUser: %s" % \
1352                                    tbparams[self.master]['user']
1353                            print >>cc, "ProjectName: %s" % \
1354                                    tbparams[self.master]['project']
1355                            print >>cc, "ExperimentID: %s/%s" % \
1356                                    ( tbparams[self.master]['project'], \
1357                                    self.eid )
1358                            cc.close()
1359                        except IOError:
1360                            raise service_error(service_error.internal,
1361                                    "Error creating client config")
1362                        # XXX: This seer specific file should disappear
1363                        try:
1364                            cc = open("%s/%s/seer.conf" %
1365                                    (self.tmpdir, self.current_gateways),
1366                                    'w')
1367                            if self.current_gateways != self.master:
1368                                print >>cc, "ControlNode: %s" % \
1369                                        self.control_gateway
1370                            print >>cc, "ExperimentID: %s/%s" % \
1371                                    ( tbparams[self.master]['project'], \
1372                                    self.eid )
1373                            cc.close()
1374                        except IOError:
1375                            raise service_error(service_error.internal,
1376                                    "Error creating seer config")
1377                    else:
1378                        debug.error("[gateways]: No control gateway for %s" %\
1379                                    self.current_gateways)
1380                    self.current_gateways = None
1381                else:
1382                    dtb, myname, desthost, type = line.split(" ")
1383
1384                    if type == "control" or type == "both":
1385                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1386                                self.eid, 
1387                                tbparams[self.current_gateways]['project'],
1388                                tbparams[self.current_gateways]['domain'])
1389                    try:
1390                        active = self.gateway_conf_file(self.current_gateways,
1391                                self.master, self.eid, self.gw_pubkey_base,
1392                                self.gw_secretkey_base,
1393                                self.active_end, tbparams, dtb, myname,
1394                                desthost, type)
1395                    except IOError, e:
1396                        raise service_error(service_error.internal,
1397                                "Failed to write config file for %s" % \
1398                                        self.current_gateway)
1399           
1400                    gw_pubkey = "%s/keys/%s" % \
1401                            (self.tmpdir, self.gw_pubkey_base)
1402                    gw_secretkey = "%s/keys/%s" % \
1403                            (self.tmpdir, self.gw_secretkey_base)
1404
1405                    pkfile = "%s/%s/%s" % \
1406                            ( self.tmpdir, self.current_gateways, 
1407                                    self.gw_pubkey_base)
1408                    skfile = "%s/%s/%s" % \
1409                            ( self.tmpdir, self.current_gateways, 
1410                                    self.gw_secretkey_base)
1411
1412                    if not os.path.exists(pkfile):
1413                        try:
1414                            self.copy_file(gw_pubkey, pkfile)
1415                        except IOError:
1416                            service_error(service_error.internal,
1417                                    "Failed to copy pubkey file")
1418
1419                    if active and not os.path.exists(skfile):
1420                        try:
1421                            self.copy_file(gw_secretkey, skfile)
1422                        except IOError:
1423                            service_error(service_error.internal,
1424                                    "Failed to copy secretkey file")
1425                return True
1426
1427    class shunt_to_file:
1428        """
1429        Simple class to write data between two regexps to a file.
1430        """
1431        def __init__(self, begin, end, filename):
1432            """
1433            Begin shunting on a match of begin, stop on end, send data to
1434            filename.
1435            """
1436            self.begin = re.compile(begin)
1437            self.end = re.compile(end)
1438            self.in_shunt = False
1439            self.file = None
1440            self.filename = filename
1441
1442        def __call__(self, line):
1443            """
1444            Call this on each line in the input that may be shunted.
1445            """
1446            if not self.in_shunt:
1447                if self.begin.match(line):
1448                    self.in_shunt = True
1449                    try:
1450                        self.file = open(self.filename, "w")
1451                    except:
1452                        self.file = None
1453                        raise
1454                    return True
1455                else:
1456                    return False
1457            else:
1458                if self.end.match(line):
1459                    if self.file: 
1460                        self.file.close()
1461                        self.file = None
1462                    self.in_shunt = False
1463                else:
1464                    if self.file:
1465                        print >>self.file, line
1466                return True
1467
1468    class shunt_to_list:
1469        """
1470        Same interface as shunt_to_file.  Data collected in self.list, one list
1471        element per line.
1472        """
1473        def __init__(self, begin, end):
1474            self.begin = re.compile(begin)
1475            self.end = re.compile(end)
1476            self.in_shunt = False
1477            self.list = [ ]
1478       
1479        def __call__(self, line):
1480            if not self.in_shunt:
1481                if self.begin.match(line):
1482                    self.in_shunt = True
1483                    return True
1484                else:
1485                    return False
1486            else:
1487                if self.end.match(line):
1488                    self.in_shunt = False
1489                else:
1490                    self.list.append(line)
1491                return True
1492
1493    class shunt_to_string:
1494        """
1495        Same interface as shunt_to_file.  Data collected in self.str, all in
1496        one string.
1497        """
1498        def __init__(self, begin, end):
1499            self.begin = re.compile(begin)
1500            self.end = re.compile(end)
1501            self.in_shunt = False
1502            self.str = ""
1503       
1504        def __call__(self, line):
1505            if not self.in_shunt:
1506                if self.begin.match(line):
1507                    self.in_shunt = True
1508                    return True
1509                else:
1510                    return False
1511            else:
1512                if self.end.match(line):
1513                    self.in_shunt = False
1514                else:
1515                    self.str += line
1516                return True
1517
1518    def create_experiment(self, req, fid):
1519        """
1520        The external interface to experiment creation called from the
1521        dispatcher.
1522
1523        Creates a working directory, splits the incoming description using the
1524        splitter script and parses out the avrious subsections using the
1525        lcasses above.  Once each sub-experiment is created, use pooled threads
1526        to instantiate them and start it all up.
1527        """
1528
1529        if not self.auth.check_attribute(fid, 'create'):
1530            raise service_error(service_error.access, "Create access denied")
1531
1532        try:
1533            tmpdir = tempfile.mkdtemp(prefix="split-")
1534        except IOError:
1535            raise service_error(service_error.internal, "Cannot create tmp dir")
1536
1537        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1538        gw_secretkey_base = "fed.%s" % self.ssh_type
1539        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1540        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1541        tclfile = tmpdir + "/experiment.tcl"
1542        tbparams = { }
1543        try:
1544            access_user = self.accessdb[fid]
1545        except KeyError:
1546            raise service_error(service_error.internal,
1547                    "Access map and authorizer out of sync in " + \
1548                            "create_experiment for fedid %s"  % fid)
1549
1550        pid = "dummy"
1551        gid = "dummy"
1552        # XXX
1553        fail_soft = False
1554
1555        try:
1556            os.mkdir(tmpdir+"/keys")
1557        except OSError:
1558            raise service_error(service_error.internal,
1559                    "Can't make temporary dir")
1560
1561        req = req.get('CreateRequestBody', None)
1562        if not req:
1563            raise service_error(service_error.req,
1564                    "Bad request format (no CreateRequestBody)")
1565        # The tcl parser needs to read a file so put the content into that file
1566        descr=req.get('experimentdescription', None)
1567        if descr:
1568            file_content=descr.get('ns2description', None)
1569            if file_content:
1570                try:
1571                    f = open(tclfile, 'w')
1572                    f.write(file_content)
1573                    f.close()
1574                except IOError:
1575                    raise service_error(service_error.internal,
1576                            "Cannot write temp experiment description")
1577            else:
1578                raise service_error(service_error.req, 
1579                        "Only ns2descriptions supported")
1580        else:
1581            raise service_error(service_error.req, "No experiment description")
1582
1583        if req.has_key('experimentID') and \
1584                req['experimentID'].has_key('localname'):
1585            eid = req['experimentID']['localname']
1586            self.state_lock.acquire()
1587            while (self.state.has_key(eid)):
1588                eid += random.choice(string.ascii_letters)
1589            # To avoid another thread picking this localname
1590            self.state[eid] = "placeholder"
1591            self.state_lock.release()
1592        else:
1593            eid = self.exp_stem
1594            for i in range(0,5):
1595                eid += random.choice(string.ascii_letters)
1596            self.state_lock.acquire()
1597            while (self.state.has_key(eid)):
1598                eid = self.exp_stem
1599                for i in range(0,5):
1600                    eid += random.choice(string.ascii_letters)
1601            # To avoid another thread picking this localname
1602            self.state[eid] = "placeholder"
1603            self.state_lock.release()
1604
1605        try: 
1606            # This catches exceptions to clear the placeholder if necessary
1607            try:
1608                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1609            except ValueError:
1610                raise service_error(service_error.server_config, 
1611                        "Bad key type (%s)" % self.ssh_type)
1612
1613            user = req.get('user', None)
1614            if user == None:
1615                raise service_error(service_error.req, "No user")
1616
1617            master = req.get('master', None)
1618            if not master:
1619                raise service_error(service_error.req,
1620                        "No master testbed label")
1621            export_project = req.get('exportProject', None)
1622            if not export_project:
1623                raise service_error(service_error.req, "No export project")
1624           
1625            if self.splitter_url:
1626                self.log.debug("Calling remote splitter at %s" % \
1627                        self.splitter_url)
1628                split_data = self.remote_splitter(self.splitter_url,
1629                        file_content, master)
1630            else:
1631                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1632                    str(self.muxmax), '-m', master]
1633
1634                if self.fedkit:
1635                    tclcmd.append('-k')
1636
1637                if self.gatewaykit:
1638                    tclcmd.append('-K')
1639
1640                tclcmd.extend([pid, gid, eid, tclfile])
1641
1642                self.log.debug("running local splitter %s", " ".join(tclcmd))
1643                tclparser = Popen(tclcmd, stdout=PIPE)
1644                split_data = tclparser.stdout
1645
1646            allocated = { }         # Testbeds we can access
1647            started = { }           # Testbeds where a sub-experiment started
1648                                # successfully
1649
1650            # Objects to parse the splitter output (defined above)
1651            parse_current_testbed = self.current_testbed(eid, tmpdir,
1652                    self.fedkit, self.gatewaykit)
1653            parse_allbeds = self.allbeds(self.get_access)
1654            parse_gateways = self.gateways(eid, master, tmpdir,
1655                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1656                    self.fedkit)
1657            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1658                        "^#\s+End\s+Vtopo")
1659            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1660                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1661            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1662                    "^#\s+End\s+tarfiles")
1663            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1664                    "^#\s+End\s+rpms")
1665
1666            # Working on the split data
1667            for line in split_data:
1668                line = line.rstrip()
1669                if parse_current_testbed(line, master, allocated, tbparams):
1670                    continue
1671                elif parse_allbeds(line, user, tbparams, master, export_project,
1672                        access_user):
1673                    continue
1674                elif parse_gateways(line, allocated, tbparams):
1675                    continue
1676                elif parse_vtopo(line):
1677                    continue
1678                elif parse_hostnames(line):
1679                    continue
1680                elif parse_tarfiles(line):
1681                    continue
1682                elif parse_rpms(line):
1683                    continue
1684                else:
1685                    raise service_error(service_error.internal, 
1686                            "Bad tcl parse? %s" % line)
1687            # Virtual topology and visualization
1688            vtopo = self.gentopo(parse_vtopo.str)
1689            if not vtopo:
1690                raise service_error(service_error.internal, 
1691                        "Failed to generate virtual topology")
1692
1693            vis = self.genviz(vtopo)
1694            if not vis:
1695                raise service_error(service_error.internal, 
1696                        "Failed to generate visualization")
1697           
1698            # save federant information
1699            for k in allocated.keys():
1700                tbparams[k]['federant'] = {\
1701                        'name': [ { 'localname' : eid} ],\
1702                        'emulab': tbparams[k]['emulab'],\
1703                        'allocID' : tbparams[k]['allocID'],\
1704                        'master' : k == master,\
1705                    }
1706
1707
1708            # Copy tarfiles and rpms needed at remote sites into a staging area
1709            try:
1710                if self.fedkit:
1711                    for t in self.fedkit:
1712                        parse_tarfiles.list.append(t[1])
1713                if self.gatewaykit:
1714                    for t in self.gatewaykit:
1715                        parse_tarfiles.list.append(t[1])
1716                for t in parse_tarfiles.list:
1717                    if not os.path.exists("%s/tarfiles" % tmpdir):
1718                        os.mkdir("%s/tarfiles" % tmpdir)
1719                    self.copy_file(t, "%s/tarfiles/%s" % \
1720                            (tmpdir, os.path.basename(t)))
1721                for r in parse_rpms.list:
1722                    if not os.path.exists("%s/rpms" % tmpdir):
1723                        os.mkdir("%s/rpms" % tmpdir)
1724                    self.copy_file(r, "%s/rpms/%s" % \
1725                            (tmpdir, os.path.basename(r)))
1726                # A null experiment file in case we need to create a remote
1727                # experiment from scratch
1728                f = open("%s/null.tcl" % tmpdir, "w")
1729                print >>f, """
1730set ns [new Simulator]
1731source tb_compat.tcl
1732
1733set a [$ns node]
1734
1735$ns rtproto Session
1736$ns run
1737"""
1738                f.close()
1739
1740            except IOError, e:
1741                raise service_error(service_error.internal, 
1742                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1743
1744        except service_error, e:
1745            # If something goes wrong in the parse (usually an access error)
1746            # clear the placeholder state.  From here on out the code delays
1747            # exceptions.
1748            self.state_lock.acquire()
1749            del self.state[eid]
1750            self.state_lock.release()
1751            raise e
1752
1753        thread_pool = self.thread_pool(self.nthreads)
1754        threads = [ ]
1755
1756        for tb in [ k for k in allocated.keys() if k != master]:
1757            # Create and start a thread to start the segment, and save it to
1758            # get the return value later
1759            thread_pool.wait_for_slot()
1760            t  = self.pooled_thread(target=self.start_segment, 
1761                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1762                    pdata=thread_pool, trace_file=self.trace_file)
1763            threads.append(t)
1764            t.start()
1765
1766        # Wait until all finish
1767        thread_pool.wait_for_all_done()
1768
1769        # If none failed, start the master
1770        failed = [ t.getName() for t in threads if not t.rv ]
1771
1772        if len(failed) == 0:
1773            if not self.start_segment(master, eid, tbparams, tmpdir):
1774                failed.append(master)
1775
1776        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1777        # If one failed clean up, unless fail_soft is set
1778        if failed:
1779            if not fail_soft:
1780                thread_pool.clear()
1781                for tb in succeeded:
1782                    # Create and start a thread to stop the segment
1783                    thread_pool.wait_for_slot()
1784                    t  = self.pooled_thread(target=self.stop_segment, 
1785                            args=(tb, eid, tbparams), name=tb,
1786                            pdata=thread_pool, trace_file=self.trace_file)
1787                    t.start()
1788                # Wait until all finish
1789                thread_pool.wait_for_all_done()
1790
1791                # release the allocations
1792                for tb in tbparams.keys():
1793                    self.release_access(tb, tbparams[tb]['allocID'])
1794                # Remove the placeholder
1795                self.state_lock.acquire()
1796                del self.state[eid]
1797                self.state_lock.release()
1798
1799                raise service_error(service_error.federant,
1800                    "Swap in failed on %s" % ",".join(failed))
1801        else:
1802            self.log.info("[start_segment]: Experiment %s started" % eid)
1803
1804        # Generate an ID for the experiment (slice) and a certificate that the
1805        # allocator can use to prove they own it.  We'll ship it back through
1806        # the encrypted connection.
1807        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1808
1809        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1810
1811        # Walk up tmpdir, deleting as we go
1812        for path, dirs, files in os.walk(tmpdir, topdown=False):
1813            for f in files:
1814                os.remove(os.path.join(path, f))
1815            for d in dirs:
1816                os.rmdir(os.path.join(path, d))
1817        os.rmdir(tmpdir)
1818
1819        # The deepcopy prevents the allocation ID and other binaries from being
1820        # translated into other formats
1821        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1822                for tb in tbparams.keys() \
1823                    if tbparams[tb].has_key('federant') ],\
1824                    'vtopo': vtopo,\
1825                    'vis' : vis,
1826                    'experimentID' : [\
1827                            { 'fedid': copy.copy(expid) }, \
1828                            { 'localname': eid },\
1829                        ],\
1830                    'experimentAccess': { 'X509' : expcert },\
1831                }
1832        # remove the allocationID info from each federant
1833        for f in resp['federant']:
1834            if f.has_key('allocID'): del f['allocID']
1835
1836        # Insert the experiment into our state and update the disk copy
1837        self.state_lock.acquire()
1838        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1839                for tb in tbparams.keys() \
1840                    if tbparams[tb].has_key('federant') ],\
1841                    'vtopo': vtopo,\
1842                    'vis' : vis,
1843                    'owner': fid,
1844                    'experimentID' : [\
1845                            { 'fedid': expid }, { 'localname': eid },\
1846                        ],\
1847                }
1848        self.state[eid] = self.state[expid]
1849        if self.state_filename: self.write_state()
1850        self.state_lock.release()
1851
1852        self.auth.set_attribute(fid, expid)
1853        self.auth.set_attribute(expid, expid)
1854
1855        if not failed:
1856            return resp
1857        else:
1858            raise service_error(service_error.partial, \
1859                    "Partial swap in on %s" % ",".join(succeeded))
1860
1861    def check_experiment_access(self, fid, key):
1862        """
1863        Confirm that the fid has access to the experiment.  Though a request
1864        may be made in terms of a local name, the access attribute is always
1865        the experiment's fedid.
1866        """
1867        if not isinstance(key, fedid):
1868            self.state_lock.acquire()
1869            if self.state.has_key(key):
1870                if isinstance(self.state[key], dict):
1871                    try:
1872                        kl = [ f['fedid'] for f in \
1873                                self.state[key]['experimentID']\
1874                                    if f.has_key('fedid') ]
1875                    except KeyError:
1876                        self.state_lock.release()
1877                        raise service_error(service_error.internal, 
1878                                "No fedid for experiment %s when checking " +\
1879                                        "access(!?)" % key)
1880                    if len(kl) == 1:
1881                        key = kl[0]
1882                    else:
1883                        self.state_lock.release()
1884                        raise service_error(service_error.internal, 
1885                                "multiple fedids for experiment %s when " +\
1886                                        "checking access(!?)" % key)
1887                elif isinstance(self.state[key], str):
1888                    self.state_lock.release()
1889                    raise service_error(service_error.internal, 
1890                            ("experiment %s is placeholder.  " +\
1891                                    "Creation in progress or aborted oddly") \
1892                                    % key)
1893                else:
1894                    self.state_lock.release()
1895                    raise service_error(service_error.internal, 
1896                            "Unexpected state for %s" % key)
1897
1898            else:
1899                self.state_lock.release()
1900                raise service_error(service_error.access, "Access Denied")
1901            self.state_lock.release()
1902
1903        if self.auth.check_attribute(fid, key):
1904            return True
1905        else:
1906            raise service_error(service_error.access, "Access Denied")
1907
1908
1909
1910    def get_vtopo(self, req, fid):
1911        """
1912        Return the stored virtual topology for this experiment
1913        """
1914        rv = None
1915
1916        req = req.get('VtopoRequestBody', None)
1917        if not req:
1918            raise service_error(service_error.req,
1919                    "Bad request format (no VtopoRequestBody)")
1920        exp = req.get('experiment', None)
1921        if exp:
1922            if exp.has_key('fedid'):
1923                key = exp['fedid']
1924                keytype = "fedid"
1925            elif exp.has_key('localname'):
1926                key = exp['localname']
1927                keytype = "localname"
1928            else:
1929                raise service_error(service_error.req, "Unknown lookup type")
1930        else:
1931            raise service_error(service_error.req, "No request?")
1932
1933        self.check_experiment_access(fid, key)
1934
1935        self.state_lock.acquire()
1936        if self.state.has_key(key):
1937            rv = { 'experiment' : {keytype: key },\
1938                    'vtopo': self.state[key]['vtopo'],\
1939                }
1940        self.state_lock.release()
1941
1942        if rv: return rv
1943        else: raise service_error(service_error.req, "No such experiment")
1944
1945    def get_vis(self, req, fid):
1946        """
1947        Return the stored visualization for this experiment
1948        """
1949        rv = None
1950
1951        req = req.get('VisRequestBody', None)
1952        if not req:
1953            raise service_error(service_error.req,
1954                    "Bad request format (no VisRequestBody)")
1955        exp = req.get('experiment', None)
1956        if exp:
1957            if exp.has_key('fedid'):
1958                key = exp['fedid']
1959                keytype = "fedid"
1960            elif exp.has_key('localname'):
1961                key = exp['localname']
1962                keytype = "localname"
1963            else:
1964                raise service_error(service_error.req, "Unknown lookup type")
1965        else:
1966            raise service_error(service_error.req, "No request?")
1967
1968        self.check_experiment_access(fid, key)
1969
1970        self.state_lock.acquire()
1971        if self.state.has_key(key):
1972            rv =  { 'experiment' : {keytype: key },\
1973                    'vis': self.state[key]['vis'],\
1974                    }
1975        self.state_lock.release()
1976
1977        if rv: return rv
1978        else: raise service_error(service_error.req, "No such experiment")
1979
1980    def get_info(self, req, fid):
1981        """
1982        Return all the stored info about this experiment
1983        """
1984        rv = None
1985
1986        req = req.get('InfoRequestBody', None)
1987        if not req:
1988            raise service_error(service_error.req,
1989                    "Bad request format (no VisRequestBody)")
1990        exp = req.get('experiment', None)
1991        if exp:
1992            if exp.has_key('fedid'):
1993                key = exp['fedid']
1994                keytype = "fedid"
1995            elif exp.has_key('localname'):
1996                key = exp['localname']
1997                keytype = "localname"
1998            else:
1999                raise service_error(service_error.req, "Unknown lookup type")
2000        else:
2001            raise service_error(service_error.req, "No request?")
2002
2003        self.check_experiment_access(fid, key)
2004
2005        # The state may be massaged by the service function that called
2006        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2007        # state.
2008        self.state_lock.acquire()
2009        if self.state.has_key(key):
2010            rv = copy.deepcopy(self.state[key])
2011        self.state_lock.release()
2012        # Remove the owner info
2013        del rv['owner']
2014        # remove the allocationID info from each federant
2015        for f in rv['federant']:
2016            if f.has_key('allocID'): del f['allocID']
2017
2018        if rv: return rv
2019        else: raise service_error(service_error.req, "No such experiment")
2020
2021
2022    def terminate_experiment(self, req, fid):
2023        """
2024        Swap this experiment out on the federants and delete the shared
2025        information
2026        """
2027        tbparams = { }
2028        req = req.get('TerminateRequestBody', None)
2029        if not req:
2030            raise service_error(service_error.req,
2031                    "Bad request format (no TerminateRequestBody)")
2032        exp = req.get('experiment', None)
2033        if exp:
2034            if exp.has_key('fedid'):
2035                key = exp['fedid']
2036                keytype = "fedid"
2037            elif exp.has_key('localname'):
2038                key = exp['localname']
2039                keytype = "localname"
2040            else:
2041                raise service_error(service_error.req, "Unknown lookup type")
2042        else:
2043            raise service_error(service_error.req, "No request?")
2044
2045        self.check_experiment_access(fid, key)
2046
2047        self.state_lock.acquire()
2048        fed_exp = self.state.get(key, None)
2049
2050        if fed_exp:
2051            # This branch of the conditional holds the lock to generate a
2052            # consistent temporary tbparams variable to deallocate experiments.
2053            # It releases the lock to do the deallocations and reacquires it to
2054            # remove the experiment state when the termination is complete.
2055            ids = []
2056            #  experimentID is a list of dicts that are self-describing
2057            #  identifiers.  This finds all the fedids and localnames - the
2058            #  keys of self.state - and puts them into ids.
2059            for id in fed_exp.get('experimentID', []):
2060                if id.has_key('fedid'): ids.append(id['fedid'])
2061                if id.has_key('localname'): ids.append(id['localname'])
2062
2063            # Construct enough of the tbparams to make the stop_segment calls
2064            # work
2065            for fed in fed_exp['federant']:
2066                try:
2067                    for e in fed['name']:
2068                        eid = e.get('localname', None)
2069                        if eid: break
2070                    else:
2071                        continue
2072
2073                    p = fed['emulab']['project']
2074
2075                    project = p['name']['localname']
2076                    tb = p['testbed']['localname']
2077                    user = p['user'][0]['userID']['localname']
2078
2079                    domain = fed['emulab']['domain']
2080                    host  = fed['emulab']['ops']
2081                    aid = fed['allocID']
2082                except KeyError, e:
2083                    continue
2084                tbparams[tb] = {\
2085                        'user': user,\
2086                        'domain': domain,\
2087                        'project': project,\
2088                        'host': host,\
2089                        'eid': eid,\
2090                        'aid': aid,\
2091                    }
2092            self.state_lock.release()
2093
2094            # Stop everyone.
2095            thread_pool = self.thread_pool(self.nthreads)
2096            for tb in tbparams.keys():
2097                # Create and start a thread to stop the segment
2098                thread_pool.wait_for_slot()
2099                t  = self.pooled_thread(target=self.stop_segment, 
2100                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2101                        pdata=thread_pool, trace_file=self.trace_file)
2102                t.start()
2103            # Wait for completions
2104            thread_pool.wait_for_all_done()
2105
2106            # release the allocations
2107            for tb in tbparams.keys():
2108                self.release_access(tb, tbparams[tb]['aid'])
2109
2110            # Remove the terminated experiment
2111            self.state_lock.acquire()
2112            for id in ids:
2113                if self.state.has_key(id): del self.state[id]
2114
2115            if self.state_filename: self.write_state()
2116            self.state_lock.release()
2117
2118            return { 'experiment': exp }
2119        else:
2120            # Don't forget to release the lock
2121            self.state_lock.release()
2122            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.