source: fedd/federation/experiment_control.py @ 5858c72

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 5858c72 was a0c12a6, checked in by Ted Faber <faber@…>, 15 years ago

Restructure the start_segment routine. Primarily this is done so that failed first time experiments will leave a log on the testbeds, but a fair amount of redundancy is removed as well. Functionally, this adds a single node swapin to the startup when there is no experiment already existing with the new name.

  • Property mode set to 100644
File size: 63.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self, nthreads):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53            self.nthreads = nthreads
54
55        def acquire(self):
56            """
57            Get the pool's lock.
58            """
59            self.changed.acquire()
60
61        def release(self):
62            """
63            Release the pool's lock.
64            """
65            self.changed.release()
66
67        def wait(self, timeout = None):
68            """
69            Wait for a pool thread to start or stop.
70            """
71            self.changed.wait(timeout)
72
73        def start(self):
74            """
75            Called by a pool thread to report starting.
76            """
77            self.changed.acquire()
78            self.started += 1
79            self.changed.notifyAll()
80            self.changed.release()
81
82        def terminate(self):
83            """
84            Called by a pool thread to report finishing.
85            """
86            self.changed.acquire()
87            self.terminated += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def clear(self):
92            """
93            Clear all pool data.
94            """
95            self.changed.acquire()
96            self.started = 0
97            self.terminated =0
98            self.changed.notifyAll()
99            self.changed.release()
100
101        def wait_for_slot(self):
102            """
103            Wait until we have a free slot to start another pooled thread
104            """
105            self.acquire()
106            while self.started - self.terminated >= self.nthreads:
107                self.wait()
108            self.release()
109
110        def wait_for_all_done(self):
111            """
112            Wait until all active threads finish (and at least one has started)
113            """
114            self.acquire()
115            while self.started == 0 or self.started > self.terminated:
116                self.wait()
117            self.release()
118
119    class pooled_thread(Thread):
120        """
121        One of a set of threads dedicated to a specific task.  Uses the
122        thread_pool class above for coordination.
123        """
124        def __init__(self, group=None, target=None, name=None, args=(), 
125                kwargs={}, pdata=None, trace_file=None):
126            Thread.__init__(self, group, target, name, args, kwargs)
127            self.rv = None          # Return value of the ops in this thread
128            self.exception = None   # Exception that terminated this thread
129            self.target=target      # Target function to run on start()
130            self.args = args        # Args to pass to target
131            self.kwargs = kwargs    # Additional kw args
132            self.pdata = pdata      # thread_pool for this class
133            # Logger for this thread
134            self.log = logging.getLogger("fedd.experiment_control")
135       
136        def run(self):
137            """
138            Emulate Thread.run, except add pool data manipulation and error
139            logging.
140            """
141            if self.pdata:
142                self.pdata.start()
143
144            if self.target:
145                try:
146                    self.rv = self.target(*self.args, **self.kwargs)
147                except service_error, s:
148                    self.exception = s
149                    self.log.error("Thread exception: %s %s" % \
150                            (s.code_string(), s.desc))
151                except:
152                    self.exception = sys.exc_info()[1]
153                    self.log.error(("Unexpected thread exception: %s" +\
154                            "Trace %s") % (self.exception,\
155                                traceback.format_exc()))
156            if self.pdata:
157                self.pdata.terminate()
158
159    call_RequestAccess = service_caller('RequestAccess')
160    call_ReleaseAccess = service_caller('ReleaseAccess')
161    call_Ns2Split = service_caller('Ns2Split')
162
163    def __init__(self, config=None, auth=None):
164        """
165        Intialize the various attributes, most from the config object
166        """
167
168        def parse_tarfile_list(tf):
169            """
170            Parse a tarfile list from the configuration.  This is a set of
171            paths and tarfiles separated by spaces.
172            """
173            rv = [ ]
174            if tf is not None:
175                tl = tf.split()
176                while len(tl) > 1:
177                    p, t = tl[0:2]
178                    del tl[0:2]
179                    rv.append((p, t))
180            return rv
181
182        self.thread_with_rv = experiment_control_local.pooled_thread
183        self.thread_pool = experiment_control_local.thread_pool
184
185        self.cert_file = config.get("experiment_control", "cert_file")
186        if self.cert_file:
187            self.cert_pwd = config.get("experiment_control", "cert_pwd")
188        else:
189            self.cert_file = config.get("globals", "cert_file")
190            self.cert_pwd = config.get("globals", "cert_pwd")
191
192        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
193                or config.get("globals", "trusted_certs")
194
195        self.exp_stem = "fed-stem"
196        self.log = logging.getLogger("fedd.experiment_control")
197        set_log_level(config, "experiment_control", self.log)
198        self.muxmax = 2
199        self.nthreads = 2
200        self.randomize_experiments = False
201
202        self.scp_exec = "/usr/bin/scp"
203        self.splitter = None
204        self.ssh_exec="/usr/bin/ssh"
205        self.ssh_keygen = "/usr/bin/ssh-keygen"
206        self.ssh_identity_file = None
207
208
209        self.debug = config.getboolean("experiment_control", "create_debug")
210        self.state_filename = config.get("experiment_control", 
211                "experiment_state")
212        self.splitter_url = config.get("experiment_control", "splitter_uri")
213        self.fedkit = parse_tarfile_list(\
214                config.get("experiment_control", "fedkit"))
215        self.gatewaykit = parse_tarfile_list(\
216                config.get("experiment_control", "gatewaykit"))
217        accessdb_file = config.get("experiment_control", "accessdb")
218
219        self.ssh_pubkey_file = config.get("experiment_control", 
220                "ssh_pubkey_file")
221        self.ssh_privkey_file = config.get("experiment_control",
222                "ssh_privkey_file")
223        # NB for internal master/slave ops, not experiment setup
224        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
225        self.state = { }
226        self.state_lock = Lock()
227        self.tclsh = "/usr/local/bin/otclsh"
228        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
229                config.get("experiment_control", "tcl_splitter",
230                        "/usr/testbed/lib/ns2ir/parse.tcl")
231        mapdb_file = config.get("experiment_control", "mapdb")
232        self.trace_file = sys.stderr
233
234        self.def_expstart = \
235                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
236                "/tmp/federate";
237        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
238                "FEDDIR/hosts";
239        self.def_gwstart = \
240                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
241                "/tmp/bridge.log";
242        self.def_mgwstart = \
243                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
244                "/tmp/bridge.log";
245        self.def_gwimage = "FBSD61-TUNNEL2";
246        self.def_gwtype = "pc";
247        self.local_access = { }
248
249        if auth:
250            self.auth = auth
251        else:
252            self.log.error(\
253                    "[access]: No authorizer initialized, creating local one.")
254            auth = authorizer()
255
256
257        if self.ssh_pubkey_file:
258            try:
259                f = open(self.ssh_pubkey_file, 'r')
260                self.ssh_pubkey = f.read()
261                f.close()
262            except IOError:
263                raise service_error(service_error.internal,
264                        "Cannot read sshpubkey")
265        else:
266            raise service_error(service_error.internal, 
267                    "No SSH public key file?")
268
269        if not self.ssh_privkey_file:
270            raise service_error(service_error.internal, 
271                    "No SSH public key file?")
272
273
274        if mapdb_file:
275            self.read_mapdb(mapdb_file)
276        else:
277            self.log.warn("[experiment_control] No testbed map, using defaults")
278            self.tbmap = { 
279                    'deter':'https://users.isi.deterlab.net:23235',
280                    'emulab':'https://users.isi.deterlab.net:23236',
281                    'ucb':'https://users.isi.deterlab.net:23237',
282                    }
283
284        if accessdb_file:
285                self.read_accessdb(accessdb_file)
286        else:
287            raise service_error(service_error.internal,
288                    "No accessdb specified in config")
289
290        # Grab saved state.  OK to do this w/o locking because it's read only
291        # and only one thread should be in existence that can see self.state at
292        # this point.
293        if self.state_filename:
294            self.read_state()
295
296        # Dispatch tables
297        self.soap_services = {\
298                'Create': soap_handler('Create', self.create_experiment),
299                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
300                'Vis': soap_handler('Vis', self.get_vis),
301                'Info': soap_handler('Info', self.get_info),
302                'Terminate': soap_handler('Terminate', 
303                    self.terminate_experiment),
304        }
305
306        self.xmlrpc_services = {\
307                'Create': xmlrpc_handler('Create', self.create_experiment),
308                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
309                'Vis': xmlrpc_handler('Vis', self.get_vis),
310                'Info': xmlrpc_handler('Info', self.get_info),
311                'Terminate': xmlrpc_handler('Terminate',
312                    self.terminate_experiment),
313        }
314
315    def copy_file(self, src, dest, size=1024):
316        """
317        Exceedingly simple file copy.
318        """
319        s = open(src,'r')
320        d = open(dest, 'w')
321
322        buf = "x"
323        while buf != "":
324            buf = s.read(size)
325            d.write(buf)
326        s.close()
327        d.close()
328
329    # Call while holding self.state_lock
330    def write_state(self):
331        """
332        Write a new copy of experiment state after copying the existing state
333        to a backup.
334
335        State format is a simple pickling of the state dictionary.
336        """
337        if os.access(self.state_filename, os.W_OK):
338            self.copy_file(self.state_filename, \
339                    "%s.bak" % self.state_filename)
340        try:
341            f = open(self.state_filename, 'w')
342            pickle.dump(self.state, f)
343        except IOError, e:
344            self.log.error("Can't write file %s: %s" % \
345                    (self.state_filename, e))
346        except pickle.PicklingError, e:
347            self.log.error("Pickling problem: %s" % e)
348        except TypeError, e:
349            self.log.error("Pickling problem (TypeError): %s" % e)
350
351    # Call while holding self.state_lock
352    def read_state(self):
353        """
354        Read a new copy of experiment state.  Old state is overwritten.
355
356        State format is a simple pickling of the state dictionary.
357        """
358        try:
359            f = open(self.state_filename, "r")
360            self.state = pickle.load(f)
361            self.log.debug("[read_state]: Read state from %s" % \
362                    self.state_filename)
363        except IOError, e:
364            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
365                    % (self.state_filename, e))
366        except pickle.UnpicklingError, e:
367            self.log.warning(("[read_state]: No saved state: " + \
368                    "Unpickling failed: %s") % e)
369       
370        for k in self.state.keys():
371            try:
372                # This list should only have one element in it, but phrasing it
373                # as a for loop doesn't cost much, really.  We have to find the
374                # fedid elements anyway.
375                for eid in [ f['fedid'] \
376                        for f in self.state[k]['experimentID']\
377                            if f.has_key('fedid') ]:
378                    self.auth.set_attribute(self.state[k]['owner'], eid)
379            except KeyError, e:
380                self.log.warning("[read_state]: State ownership or identity " +\
381                        "misformatted in %s: %s" % (self.state_filename, e))
382
383
384    def read_accessdb(self, accessdb_file):
385        """
386        Read the mapping from fedids that can create experiments to their name
387        in the 3-level access namespace.  All will be asserted from this
388        testbed and can include the local username and porject that will be
389        asserted on their behalf by this fedd.  Each fedid is also added to the
390        authorization system with the "create" attribute.
391        """
392        self.accessdb = {}
393        # These are the regexps for parsing the db
394        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
395        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
396                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
397        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
398                "\s*->\s*(" + name_expr + ")\s*$")
399        lineno = 0
400
401        # Parse the mappings and store in self.authdb, a dict of
402        # fedid -> (proj, user)
403        try:
404            f = open(accessdb_file, "r")
405            for line in f:
406                lineno += 1
407                line = line.strip()
408                if len(line) == 0 or line.startswith('#'):
409                    continue
410                m = project_line.match(line)
411                if m:
412                    fid = fedid(hexstr=m.group(1))
413                    project, user = m.group(2,3)
414                    if not self.accessdb.has_key(fid):
415                        self.accessdb[fid] = []
416                    self.accessdb[fid].append((project, user))
417                    continue
418
419                m = user_line.match(line)
420                if m:
421                    fid = fedid(hexstr=m.group(1))
422                    project = None
423                    user = m.group(2)
424                    if not self.accessdb.has_key(fid):
425                        self.accessdb[fid] = []
426                    self.accessdb[fid].append((project, user))
427                    continue
428                self.log.warn("[experiment_control] Error parsing access " +\
429                        "db %s at line %d" %  (accessdb_file, lineno))
430        except IOError:
431            raise service_error(service_error.internal,
432                    "Error opening/reading %s as experiment " +\
433                            "control accessdb" %  accessdb_file)
434        f.close()
435
436        # Initialize the authorization attributes
437        for fid in self.accessdb.keys():
438            self.auth.set_attribute(fid, 'create')
439
440    def read_mapdb(self, file):
441        """
442        Read a simple colon separated list of mappings for the
443        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
444        """
445
446        self.tbmap = { }
447        lineno =0
448        try:
449            f = open(file, "r")
450            for line in f:
451                lineno += 1
452                line = line.strip()
453                if line.startswith('#') or len(line) == 0:
454                    continue
455                try:
456                    label, url = line.split(':', 1)
457                    self.tbmap[label] = url
458                except ValueError, e:
459                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
460                            "map db: %s %s" % (lineno, line, e))
461        except IOError, e:
462            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
463                    "open %s: %s" % (file, e))
464        f.close()
465
466    def scp_file(self, file, user, host, dest=""):
467        """
468        scp a file to the remote host.  If debug is set the action is only
469        logged.
470        """
471
472        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
473                '-o', 'StrictHostKeyChecking yes', '-i', 
474                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
475        rv = 0
476
477        try:
478            dnull = open("/dev/null", "w")
479        except IOError:
480            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
481            dnull = Null
482
483        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
484        if not self.debug:
485            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
486
487        return rv == 0
488
489    def ssh_cmd(self, user, host, cmd, wname=None):
490        """
491        Run a remote command on host as user.  If debug is set, the action is
492        only logged.
493        """
494        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
495                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
496                (self.ssh_exec, self.ssh_privkey_file, 
497                        user, host, cmd)
498
499        try:
500            dnull = open("/dev/null", "r")
501        except IOError:
502            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
503            dnull = Null
504
505        self.log.debug("[ssh_cmd]: %s" % sh_str)
506        if not self.debug:
507            if dnull:
508                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
509            else:
510                sub = Popen(sh_str, shell=True)
511            return sub.wait() == 0
512        else:
513            return True
514
515    def ship_configs(self, host, user, src_dir, dest_dir):
516        """
517        Copy federant-specific configuration files to the federant.
518        """
519        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
520            return False
521        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
522            return False
523
524        for f in os.listdir(src_dir):
525            if os.path.isdir(f):
526                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
527                        "%s/%s" % (dest_dir, f)):
528                    return False
529            else:
530                if not self.scp_file("%s/%s" % (src_dir, f), 
531                        user, host, dest_dir):
532                    return False
533        return True
534
535    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
536        """
537        Start a sub-experiment on a federant.
538
539        Get the current state, modify or create as appropriate, ship data and
540        configs and start the experiment.  There are small ordering differences
541        based on the initial state of the sub-experiment.
542        """
543        # ops node in the federant
544        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
545        user = tbparams[tb]['user']     # federant user
546        pid = tbparams[tb]['project']   # federant project
547        # XXX
548        base_confs = ( "hosts",)
549        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
550        # command to test experiment state
551        expinfo_exec = "/usr/testbed/bin/expinfo" 
552        # Configuration directories on the remote machine
553        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
554        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
555        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
556        # Regular expressions to parse the expinfo response
557        state_re = re.compile("State:\s+(\w+)")
558        no_exp_re = re.compile("^No\s+such\s+experiment")
559        state = None    # Experiment state parsed from expinfo
560        # The expinfo ssh command.  Note the identity restriction to use only
561        # the identity provided in the pubkey given.
562        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
563                'StrictHostKeyChecking yes', '-i', 
564                self.ssh_privkey_file, "%s@%s" % (user, host), 
565                expinfo_exec, pid, eid]
566
567        # Get status
568        self.log.debug("[start_segment]: %s"% " ".join(cmd))
569        dev_null = None
570        try:
571            dev_null = open("/dev/null", "a")
572        except IOError, e:
573            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
574
575        if self.debug:
576            state = 'swapped'
577            rv = 0
578        else:
579            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
580            for line in status.stdout:
581                m = state_re.match(line)
582                if m: state = m.group(1)
583                else:
584                    m = no_exp_re.match(line)
585                    if m: state = "none"
586            rv = status.wait()
587
588        # If the experiment is not present the subcommand returns a non-zero
589        # return value.  If we successfully parsed a "none" outcome, ignore the
590        # return code.
591        if rv != 0 and state != 'none':
592            raise service_error(service_error.internal,
593                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
594
595        if state not in ('active', 'swapped', 'none'):
596            self.log.debug("[start_segment]:unknown state %s" % state)
597            return False
598
599        self.log.debug("[start_segment]: %s: %s" % (tb, state))
600        self.log.info("[start_segment]:transferring experiment to %s" % tb)
601
602        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
603            return False
604        # Clear the federation config dirs
605        if not self.ssh_cmd(user, host, 
606                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
607            return False
608        # Clear and create the tarfiles and rpm directories
609        for d in (tarfiles_dir, rpms_dir):
610            if not self.ssh_cmd(user, host, 
611                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
612                return False
613            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
614                    "create tarfiles"):
615                return False
616       
617        if state == 'none':
618            # Create a null copy of the experiment so that we capture any logs
619            # there if the modify fails.  Emulab software discards the logs
620            # from a failed startexp
621            if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
622                return False
623            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
624            if not self.ssh_cmd(user, host,
625                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s null.tcl" \
626                            % (pid, eid), "startexp"):
627                return False
628       
629        # Create the federation config dirs
630        if not self.ssh_cmd(user, host, 
631                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
632            return False
633        for f in base_confs:
634            if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
635                    "%s/%s" % (proj_dir, f)):
636                return False
637        if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
638                proj_dir):
639            return False
640        if os.path.isdir("%s/tarfiles" % tmpdir):
641            if not self.ship_configs(host, user,
642                    "%s/tarfiles" % tmpdir, tarfiles_dir):
643                return False
644        if os.path.isdir("%s/rpms" % tmpdir):
645            if not self.ship_configs(host, user,
646                    "%s/rpms" % tmpdir, tarfiles_dir):
647                return False
648        # Stage the new configuration (active experiments will stay swapped in
649        # now)
650        self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
651        if not self.ssh_cmd(user, host,
652                "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
653                        (pid, eid, tclfile),
654                "modexp"):
655            return False
656        # Active experiments are still swapped, this swaps the others in.
657        if state != 'active':
658            self.log.info("[start_segment]: Swapping %s in on %s" % \
659                    (eid, tb))
660            if not self.ssh_cmd(user, host,
661                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
662                    "swapexp"):
663                return False
664        return True
665
666    def stop_segment(self, tb, eid, tbparams):
667        """
668        Stop a sub experiment by calling swapexp on the federant
669        """
670        user = tbparams[tb]['user']
671        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
672        pid = tbparams[tb]['project']
673
674        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
675        return self.ssh_cmd(user, host,
676                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
677
678       
679    def generate_ssh_keys(self, dest, type="rsa" ):
680        """
681        Generate a set of keys for the gateways to use to talk.
682
683        Keys are of type type and are stored in the required dest file.
684        """
685        valid_types = ("rsa", "dsa")
686        t = type.lower();
687        if t not in valid_types: raise ValueError
688        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
689
690        try:
691            trace = open("/dev/null", "w")
692        except IOError:
693            raise service_error(service_error.internal,
694                    "Cannot open /dev/null??");
695
696        # May raise CalledProcessError
697        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
698        rv = call(cmd, stdout=trace, stderr=trace)
699        if rv != 0:
700            raise service_error(service_error.internal, 
701                    "Cannot generate nonce ssh keys.  %s return code %d" \
702                            % (self.ssh_keygen, rv))
703
704    def gentopo(self, str):
705        """
706        Generate the topology dtat structure from the splitter's XML
707        representation of it.
708
709        The topology XML looks like:
710            <experiment>
711                <nodes>
712                    <node><vname></vname><ips>ip1:ip2</ips></node>
713                </nodes>
714                <lans>
715                    <lan>
716                        <vname></vname><vnode></vnode><ip></ip>
717                        <bandwidth></bandwidth><member>node:port</member>
718                    </lan>
719                </lans>
720        """
721        class topo_parse:
722            """
723            Parse the topology XML and create the dats structure.
724            """
725            def __init__(self):
726                # Typing of the subelements for data conversion
727                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
728                self.int_subelements = ( 'bandwidth',)
729                self.float_subelements = ( 'delay',)
730                # The final data structure
731                self.nodes = [ ]
732                self.lans =  [ ]
733                self.topo = { \
734                        'node': self.nodes,\
735                        'lan' : self.lans,\
736                    }
737                self.element = { }  # Current element being created
738                self.chars = ""     # Last text seen
739
740            def end_element(self, name):
741                # After each sub element the contents is added to the current
742                # element or to the appropriate list.
743                if name == 'node':
744                    self.nodes.append(self.element)
745                    self.element = { }
746                elif name == 'lan':
747                    self.lans.append(self.element)
748                    self.element = { }
749                elif name in self.str_subelements:
750                    self.element[name] = self.chars
751                    self.chars = ""
752                elif name in self.int_subelements:
753                    self.element[name] = int(self.chars)
754                    self.chars = ""
755                elif name in self.float_subelements:
756                    self.element[name] = float(self.chars)
757                    self.chars = ""
758
759            def found_chars(self, data):
760                self.chars += data.rstrip()
761
762
763        tp = topo_parse();
764        parser = xml.parsers.expat.ParserCreate()
765        parser.EndElementHandler = tp.end_element
766        parser.CharacterDataHandler = tp.found_chars
767
768        parser.Parse(str)
769
770        return tp.topo
771       
772
773    def genviz(self, topo):
774        """
775        Generate the visualization the virtual topology
776        """
777
778        neato = "/usr/local/bin/neato"
779        # These are used to parse neato output and to create the visualization
780        # file.
781        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
782        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
783                "%s</type></node>"
784
785        try:
786            # Node names
787            nodes = [ n['vname'] for n in topo['node'] ]
788            topo_lans = topo['lan']
789        except KeyError:
790            raise service_error(service_error.internal, "Bad topology")
791
792        lans = { }
793        links = { }
794
795        # Walk through the virtual topology, organizing the connections into
796        # 2-node connections (links) and more-than-2-node connections (lans).
797        # When a lan is created, it's added to the list of nodes (there's a
798        # node in the visualization for the lan).
799        for l in topo_lans:
800            if links.has_key(l['vname']):
801                if len(links[l['vname']]) < 2:
802                    links[l['vname']].append(l['vnode'])
803                else:
804                    nodes.append(l['vname'])
805                    lans[l['vname']] = links[l['vname']]
806                    del links[l['vname']]
807                    lans[l['vname']].append(l['vnode'])
808            elif lans.has_key(l['vname']):
809                lans[l['vname']].append(l['vnode'])
810            else:
811                links[l['vname']] = [ l['vnode'] ]
812
813
814        # Open up a temporary file for dot to turn into a visualization
815        try:
816            df, dotname = tempfile.mkstemp()
817            dotfile = os.fdopen(df, 'w')
818        except IOError:
819            raise service_error(service_error.internal,
820                    "Failed to open file in genviz")
821
822        # Generate a dot/neato input file from the links, nodes and lans
823        try:
824            print >>dotfile, "graph G {"
825            for n in nodes:
826                print >>dotfile, '\t"%s"' % n
827            for l in links.keys():
828                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
829            for l in lans.keys():
830                for n in lans[l]:
831                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
832            print >>dotfile, "}"
833            dotfile.close()
834        except TypeError:
835            raise service_error(service_error.internal,
836                    "Single endpoint link in vtopo")
837        except IOError:
838            raise service_error(service_error.internal, "Cannot write dot file")
839
840        # Use dot to create a visualization
841        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
842                '-Gpack=true', dotname], stdout=PIPE)
843
844        # Translate dot to vis format
845        vis_nodes = [ ]
846        vis = { 'node': vis_nodes }
847        for line in dot.stdout:
848            m = vis_re.match(line)
849            if m:
850                vn = m.group(1)
851                vis_node = {'name': vn, \
852                        'x': float(m.group(2)),\
853                        'y' : float(m.group(3)),\
854                    }
855                if vn in links.keys() or vn in lans.keys():
856                    vis_node['type'] = 'lan'
857                else:
858                    vis_node['type'] = 'node'
859                vis_nodes.append(vis_node)
860        rv = dot.wait()
861
862        os.remove(dotname)
863        if rv == 0 : return vis
864        else: return None
865
866    def get_access(self, tb, nodes, user, tbparam, master, export_project,
867            access_user):
868        """
869        Get access to testbed through fedd and set the parameters for that tb
870        """
871        uri = self.tbmap.get(tb, None)
872        if not uri:
873            raise service_error(serice_error.server_config, 
874                    "Unknown testbed: %s" % tb)
875
876        # currently this lumps all users into one service access group
877        service_keys = [ a for u in user \
878                for a in u.get('access', []) \
879                    if a.has_key('sshPubkey')]
880
881        if len(service_keys) == 0:
882            raise service_error(service_error.req, 
883                    "Must have at least one SSH pubkey for services")
884
885
886        for p, u in access_user:
887            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
888                    "to %s") %  ((p or "None"), u, uri))
889
890            if p:
891                # Request with user and project specified
892                req = {\
893                        'destinationTestbed' : { 'uri' : uri },
894                        'project': { 
895                            'name': {'localname': p},
896                            'user': [ {'userID': { 'localname': u } } ],
897                            },
898                        'user':  user,
899                        'allocID' : { 'localname': 'test' },
900                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
901                        'serviceAccess' : service_keys
902                    }
903            else:
904                # Request with only user specified
905                req = {\
906                        'destinationTestbed' : { 'uri' : uri },
907                        'user':  [ {'userID': { 'localname': u } } ],
908                        'allocID' : { 'localname': 'test' },
909                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
910                        'serviceAccess' : service_keys
911                    }
912
913            if tb == master:
914                # NB, the export_project parameter is a dict that includes
915                # the type
916                req['exportProject'] = export_project
917
918            # node resources if any
919            if nodes != None and len(nodes) > 0:
920                rnodes = [ ]
921                for n in nodes:
922                    rn = { }
923                    image, hw, count = n.split(":")
924                    if image: rn['image'] = [ image ]
925                    if hw: rn['hardware'] = [ hw ]
926                    if count and int(count) >0 : rn['count'] = int(count)
927                    rnodes.append(rn)
928                req['resources']= { }
929                req['resources']['node'] = rnodes
930
931            try:
932                if self.local_access.has_key(uri):
933                    # Local access call
934                    req = { 'RequestAccessRequestBody' : req }
935                    r = self.local_access[uri].RequestAccess(req, 
936                            fedid(file=self.cert_file))
937                    r = { 'RequestAccessResponseBody' : r }
938                else:
939                    r = self.call_RequestAccess(uri, req, 
940                            self.cert_file, self.cert_pwd, self.trusted_certs)
941            except service_error, e:
942                if e.code == service_error.access:
943                    self.log.debug("[get_access] Access denied")
944                    r = None
945                    continue
946                else:
947                    raise e
948
949            if r.has_key('RequestAccessResponseBody'):
950                # Through to here we have a valid response, not a fault.
951                # Access denied is a fault, so something better or worse than
952                # access denied has happened.
953                r = r['RequestAccessResponseBody']
954                self.log.debug("[get_access] Access granted")
955                break
956            else:
957                raise service_error(service_error.protocol,
958                        "Bad proxy response")
959       
960        if not r:
961            raise service_error(service_error.access, 
962                    "Access denied by %s (%s)" % (tb, uri))
963
964        e = r['emulab']
965        p = e['project']
966        tbparam[tb] = { 
967                "boss": e['boss'],
968                "host": e['ops'],
969                "domain": e['domain'],
970                "fs": e['fileServer'],
971                "eventserver": e['eventServer'],
972                "project": unpack_id(p['name']),
973                "emulab" : e,
974                "allocID" : r['allocID'],
975                }
976        # Make the testbed name be the label the user applied
977        p['testbed'] = {'localname': tb }
978
979        for u in p['user']:
980            role = u.get('role', None)
981            if role == 'experimentCreation':
982                tbparam[tb]['user'] = unpack_id(u['userID'])
983                break
984        else:
985            raise service_error(service_error.internal, 
986                    "No createExperimentUser from %s" %tb)
987
988        # Add attributes to barameter space.  We don't allow attributes to
989        # overlay any parameters already installed.
990        for a in e['fedAttr']:
991            try:
992                if a['attribute'] and isinstance(a['attribute'], basestring)\
993                        and not tbparam[tb].has_key(a['attribute'].lower()):
994                    tbparam[tb][a['attribute'].lower()] = a['value']
995            except KeyError:
996                self.log.error("Bad attribute in response: %s" % a)
997       
998    def release_access(self, tb, aid):
999        """
1000        Release access to testbed through fedd
1001        """
1002
1003        uri = self.tbmap.get(tb, None)
1004        if not uri:
1005            raise service_error(serice_error.server_config, 
1006                    "Unknown testbed: %s" % tb)
1007
1008        if self.local_access.has_key(uri):
1009            resp = self.local_access[uri].ReleaseAccess(\
1010                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1011                    fedid(file=self.cert_file))
1012            resp = { 'ReleaseAccessResponseBody': resp } 
1013        else:
1014            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1015                    self.cert_file, self.cert_pwd, self.trusted_certs)
1016
1017        # better error coding
1018
1019    def remote_splitter(self, uri, desc, master):
1020
1021        req = {
1022                'description' : { 'ns2description': desc },
1023                'master': master,
1024                'include_fedkit': bool(self.fedkit),
1025                'include_gatewaykit': bool(self.gatewaykit)
1026            }
1027
1028        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1029                self.trusted_certs)
1030
1031        if r.has_key('Ns2SplitResponseBody'):
1032            r = r['Ns2SplitResponseBody']
1033            if r.has_key('output'):
1034                return r['output'].splitlines()
1035            else:
1036                raise service_error(service_error.protocol, 
1037                        "Bad splitter response (no output)")
1038        else:
1039            raise service_error(service_error.protocol, "Bad splitter response")
1040       
1041    class current_testbed:
1042        """
1043        Object for collecting the current testbed description.  The testbed
1044        description is saved to a file with the local testbed variables
1045        subsittuted line by line.
1046        """
1047        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1048            def tar_list_to_string(tl):
1049                if tl is None: return None
1050
1051                rv = ""
1052                for t in tl:
1053                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1054                            (t[0], os.path.basename(t[1]))
1055                return rv
1056
1057
1058            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1059            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1060            self.current_testbed = None
1061            self.testbed_file = None
1062
1063            self.def_expstart = \
1064                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1065            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1066            self.def_gwstart = \
1067                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1068            self.def_mgwstart = \
1069                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1070            self.def_gwimage = "FBSD61-TUNNEL2";
1071            self.def_gwtype = "pc";
1072            self.def_mgwcmd = '# '
1073            self.def_mgwcmdparams = ''
1074            self.def_gwcmd = '# '
1075            self.def_gwcmdparams = ''
1076
1077            self.eid = eid
1078            self.tmpdir = tmpdir
1079            # Convert fedkit and gateway kit (which are lists of tuples) into a
1080            # substituition string.
1081            self.fedkit = tar_list_to_string(fedkit)
1082            self.gatewaykit = tar_list_to_string(gatewaykit)
1083
1084        def __call__(self, line, master, allocated, tbparams):
1085            # Capture testbed topology descriptions
1086            if self.current_testbed == None:
1087                m = self.begin_testbed.match(line)
1088                if m != None:
1089                    self.current_testbed = m.group(1)
1090                    if self.current_testbed == None:
1091                        raise service_error(service_error.req,
1092                                "Bad request format (unnamed testbed)")
1093                    allocated[self.current_testbed] = \
1094                            allocated.get(self.current_testbed,0) + 1
1095                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1096                    if not os.path.exists(tb_dir):
1097                        try:
1098                            os.mkdir(tb_dir)
1099                        except IOError:
1100                            raise service_error(service_error.internal,
1101                                    "Cannot create %s" % tb_dir)
1102                    try:
1103                        self.testbed_file = open("%s/%s.%s.tcl" %
1104                                (tb_dir, self.eid, self.current_testbed), 'w')
1105                    except IOError:
1106                        self.testbed_file = None
1107                    return True
1108                else: return False
1109            else:
1110                m = self.end_testbed.match(line)
1111                if m != None:
1112                    if m.group(1) != self.current_testbed:
1113                        raise service_error(service_error.internal, 
1114                                "Mismatched testbed markers!?")
1115                    if self.testbed_file != None: 
1116                        self.testbed_file.close()
1117                        self.testbed_file = None
1118                    self.current_testbed = None
1119                elif self.testbed_file:
1120                    # Substitute variables and put the line into the local
1121                    # testbed file.
1122                    gwtype = tbparams[self.current_testbed].get(\
1123                            'connectortype', self.def_gwtype)
1124                    gwimage = tbparams[self.current_testbed].get(\
1125                            'connectorimage', self.def_gwimage)
1126                    mgwstart = tbparams[self.current_testbed].get(\
1127                            'masterconnectorstartcmd', self.def_mgwstart)
1128                    mexpstart = tbparams[self.current_testbed].get(\
1129                            'masternodestartcmd', self.def_mexpstart)
1130                    gwstart = tbparams[self.current_testbed].get(\
1131                            'slaveconnectorstartcmd', self.def_gwstart)
1132                    expstart = tbparams[self.current_testbed].get(\
1133                            'slavenodestartcmd', self.def_expstart)
1134                    project = tbparams[self.current_testbed].get('project')
1135                    gwcmd = tbparams[self.current_testbed].get(\
1136                            'slaveconnectorcmd', self.def_gwcmd)
1137                    gwcmdparams = tbparams[self.current_testbed].get(\
1138                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1139                    mgwcmd = tbparams[self.current_testbed].get(\
1140                            'masterconnectorcmd', self.def_gwcmd)
1141                    mgwcmdparams = tbparams[self.current_testbed].get(\
1142                            'masterconnectorcmdparams', self.def_gwcmdparams)
1143                    line = re.sub("GWTYPE", gwtype, line)
1144                    line = re.sub("GWIMAGE", gwimage, line)
1145                    if self.current_testbed == master:
1146                        line = re.sub("GWSTART", mgwstart, line)
1147                        line = re.sub("EXPSTART", mexpstart, line)
1148                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1149                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1150                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1151                    else:
1152                        line = re.sub("GWSTART", gwstart, line)
1153                        line = re.sub("EXPSTART", expstart, line)
1154                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1155                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1156                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1157                    #These expansions contain EID and PROJDIR.  NB these are
1158                    # local fedkit and gatewaykit, which are strings.
1159                    if self.fedkit:
1160                        line = re.sub("FEDKIT", self.fedkit, line)
1161                    if self.gatewaykit:
1162                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1163                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1164                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1165                    line = re.sub("EID", self.eid, line)
1166                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1167                            (project, self.eid), line)
1168                    print >>self.testbed_file, line
1169                return True
1170
1171    class allbeds:
1172        """
1173        Process the Allbeds section.  Get access to each federant and save the
1174        parameters in tbparams
1175        """
1176        def __init__(self, get_access):
1177            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1178            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1179            self.in_allbeds = False
1180            self.get_access = get_access
1181
1182        def __call__(self, line, user, tbparams, master, export_project,
1183                access_user):
1184            # Testbed access parameters
1185            if not self.in_allbeds:
1186                if self.begin_allbeds.match(line):
1187                    self.in_allbeds = True
1188                    return True
1189                else:
1190                    return False
1191            else:
1192                if self.end_allbeds.match(line):
1193                    self.in_allbeds = False
1194                else:
1195                    nodes = line.split('|')
1196                    tb = nodes.pop(0)
1197                    self.get_access(tb, nodes, user, tbparams, master,
1198                            export_project, access_user)
1199                return True
1200
1201    class gateways:
1202        def __init__(self, eid, master, tmpdir, gw_pubkey,
1203                gw_secretkey, copy_file, fedkit):
1204            self.begin_gateways = \
1205                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1206            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1207            self.current_gateways = None
1208            self.control_gateway = None
1209            self.active_end = { }
1210
1211            self.eid = eid
1212            self.master = master
1213            self.tmpdir = tmpdir
1214            self.gw_pubkey_base = gw_pubkey
1215            self.gw_secretkey_base = gw_secretkey
1216
1217            self.copy_file = copy_file
1218            self.fedkit = fedkit
1219
1220
1221        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1222                active_end, tbparams, dtb, myname, desthost, type):
1223            """
1224            Produce a gateway configuration file from a gateways line.
1225            """
1226
1227            sproject = tbparams[gw].get('project', 'project')
1228            dproject = tbparams[dtb].get('project', 'project')
1229            sdomain = ".%s.%s%s" % (eid, sproject,
1230                    tbparams[gw].get('domain', ".example.com"))
1231            ddomain = ".%s.%s%s" % (eid, dproject,
1232                    tbparams[dtb].get('domain', ".example.com"))
1233            boss = tbparams[master].get('boss', "boss")
1234            fs = tbparams[master].get('fs', "fs")
1235            event_server = "%s%s" % \
1236                    (tbparams[gw].get('eventserver', "event_server"),
1237                            tbparams[gw].get('domain', "example.com"))
1238            remote_event_server = "%s%s" % \
1239                    (tbparams[dtb].get('eventserver', "event_server"),
1240                            tbparams[dtb].get('domain', "example.com"))
1241            seer_control = "%s%s" % \
1242                    (tbparams[gw].get('control', "control"), sdomain)
1243            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1244
1245            if self.fedkit:
1246                remote_script_dir = "/usr/local/federation/bin"
1247                local_script_dir = "/usr/local/federation/bin"
1248            else:
1249                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1250                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1251
1252            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1253            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1254            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1255
1256            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1257            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1258
1259            # translate to lower case so the `hostname` hack for specifying
1260            # configuration files works.
1261            conf_file = conf_file.lower();
1262            remote_conf_file = remote_conf_file.lower();
1263
1264            if dtb == master:
1265                active = "false"
1266            elif gw == master:
1267                active = "true"
1268            elif active_end.has_key('%s-%s' % (dtb, gw)):
1269                active = "false"
1270            else:
1271                active_end['%s-%s' % (gw, dtb)] = 1
1272                active = "true"
1273
1274            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1275            print >>gwconfig, "Active: %s" % active
1276            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1277            if tunnel_iface:
1278                print >>gwconfig, "Interface: %s" % tunnel_iface
1279            print >>gwconfig, "BossName: %s" % boss
1280            print >>gwconfig, "FsName: %s" % fs
1281            print >>gwconfig, "EventServerName: %s" % event_server
1282            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1283            print >>gwconfig, "SeerControl: %s" % seer_control
1284            print >>gwconfig, "Type: %s" % type
1285            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1286            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1287                    local_script_dir
1288            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1289            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1290            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1291                    (remote_conf_dir, remote_conf_file)
1292            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1293            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1294            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1295            gwconfig.close()
1296
1297            return active == "true"
1298
1299        def __call__(self, line, allocated, tbparams):
1300            # Process gateways
1301            if not self.current_gateways:
1302                m = self.begin_gateways.match(line)
1303                if m:
1304                    self.current_gateways = m.group(1)
1305                    if allocated.has_key(self.current_gateways):
1306                        # This test should always succeed
1307                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1308                        if not os.path.exists(tb_dir):
1309                            try:
1310                                os.mkdir(tb_dir)
1311                            except IOError:
1312                                raise service_error(service_error.internal,
1313                                        "Cannot create %s" % tb_dir)
1314                    else:
1315                        # XXX
1316                        self.log.error("[gateways]: Ignoring gateways for " + \
1317                                "unknown testbed %s" % self.current_gateways)
1318                        self.current_gateways = None
1319                    return True
1320                else:
1321                    return False
1322            else:
1323                m = self.end_gateways.match(line)
1324                if m :
1325                    if m.group(1) != self.current_gateways:
1326                        raise service_error(service_error.internal,
1327                                "Mismatched gateway markers!?")
1328                    if self.control_gateway:
1329                        try:
1330                            cc = open("%s/%s/client.conf" %
1331                                    (self.tmpdir, self.current_gateways), 'w')
1332                            print >>cc, "ControlGateway: %s" % \
1333                                    self.control_gateway
1334                            if tbparams[self.master].has_key('smbshare'):
1335                                print >>cc, "SMBSHare: %s" % \
1336                                        tbparams[self.master]['smbshare']
1337                            print >>cc, "ProjectUser: %s" % \
1338                                    tbparams[self.master]['user']
1339                            print >>cc, "ProjectName: %s" % \
1340                                    tbparams[self.master]['project']
1341                            print >>cc, "ExperimentID: %s/%s" % \
1342                                    ( tbparams[self.master]['project'], \
1343                                    self.eid )
1344                            cc.close()
1345                        except IOError:
1346                            raise service_error(service_error.internal,
1347                                    "Error creating client config")
1348                        # XXX: This seer specific file should disappear
1349                        try:
1350                            cc = open("%s/%s/seer.conf" %
1351                                    (self.tmpdir, self.current_gateways),
1352                                    'w')
1353                            if self.current_gateways != self.master:
1354                                print >>cc, "ControlNode: %s" % \
1355                                        self.control_gateway
1356                            print >>cc, "ExperimentID: %s/%s" % \
1357                                    ( tbparams[self.master]['project'], \
1358                                    self.eid )
1359                            cc.close()
1360                        except IOError:
1361                            raise service_error(service_error.internal,
1362                                    "Error creating seer config")
1363                    else:
1364                        debug.error("[gateways]: No control gateway for %s" %\
1365                                    self.current_gateways)
1366                    self.current_gateways = None
1367                else:
1368                    dtb, myname, desthost, type = line.split(" ")
1369
1370                    if type == "control" or type == "both":
1371                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1372                                self.eid, 
1373                                tbparams[self.current_gateways]['project'],
1374                                tbparams[self.current_gateways]['domain'])
1375                    try:
1376                        active = self.gateway_conf_file(self.current_gateways,
1377                                self.master, self.eid, self.gw_pubkey_base,
1378                                self.gw_secretkey_base,
1379                                self.active_end, tbparams, dtb, myname,
1380                                desthost, type)
1381                    except IOError, e:
1382                        raise service_error(service_error.internal,
1383                                "Failed to write config file for %s" % \
1384                                        self.current_gateway)
1385           
1386                    gw_pubkey = "%s/keys/%s" % \
1387                            (self.tmpdir, self.gw_pubkey_base)
1388                    gw_secretkey = "%s/keys/%s" % \
1389                            (self.tmpdir, self.gw_secretkey_base)
1390
1391                    pkfile = "%s/%s/%s" % \
1392                            ( self.tmpdir, self.current_gateways, 
1393                                    self.gw_pubkey_base)
1394                    skfile = "%s/%s/%s" % \
1395                            ( self.tmpdir, self.current_gateways, 
1396                                    self.gw_secretkey_base)
1397
1398                    if not os.path.exists(pkfile):
1399                        try:
1400                            self.copy_file(gw_pubkey, pkfile)
1401                        except IOError:
1402                            service_error(service_error.internal,
1403                                    "Failed to copy pubkey file")
1404
1405                    if active and not os.path.exists(skfile):
1406                        try:
1407                            self.copy_file(gw_secretkey, skfile)
1408                        except IOError:
1409                            service_error(service_error.internal,
1410                                    "Failed to copy secretkey file")
1411                return True
1412
1413    class shunt_to_file:
1414        """
1415        Simple class to write data between two regexps to a file.
1416        """
1417        def __init__(self, begin, end, filename):
1418            """
1419            Begin shunting on a match of begin, stop on end, send data to
1420            filename.
1421            """
1422            self.begin = re.compile(begin)
1423            self.end = re.compile(end)
1424            self.in_shunt = False
1425            self.file = None
1426            self.filename = filename
1427
1428        def __call__(self, line):
1429            """
1430            Call this on each line in the input that may be shunted.
1431            """
1432            if not self.in_shunt:
1433                if self.begin.match(line):
1434                    self.in_shunt = True
1435                    try:
1436                        self.file = open(self.filename, "w")
1437                    except:
1438                        self.file = None
1439                        raise
1440                    return True
1441                else:
1442                    return False
1443            else:
1444                if self.end.match(line):
1445                    if self.file: 
1446                        self.file.close()
1447                        self.file = None
1448                    self.in_shunt = False
1449                else:
1450                    if self.file:
1451                        print >>self.file, line
1452                return True
1453
1454    class shunt_to_list:
1455        """
1456        Same interface as shunt_to_file.  Data collected in self.list, one list
1457        element per line.
1458        """
1459        def __init__(self, begin, end):
1460            self.begin = re.compile(begin)
1461            self.end = re.compile(end)
1462            self.in_shunt = False
1463            self.list = [ ]
1464       
1465        def __call__(self, line):
1466            if not self.in_shunt:
1467                if self.begin.match(line):
1468                    self.in_shunt = True
1469                    return True
1470                else:
1471                    return False
1472            else:
1473                if self.end.match(line):
1474                    self.in_shunt = False
1475                else:
1476                    self.list.append(line)
1477                return True
1478
1479    class shunt_to_string:
1480        """
1481        Same interface as shunt_to_file.  Data collected in self.str, all in
1482        one string.
1483        """
1484        def __init__(self, begin, end):
1485            self.begin = re.compile(begin)
1486            self.end = re.compile(end)
1487            self.in_shunt = False
1488            self.str = ""
1489       
1490        def __call__(self, line):
1491            if not self.in_shunt:
1492                if self.begin.match(line):
1493                    self.in_shunt = True
1494                    return True
1495                else:
1496                    return False
1497            else:
1498                if self.end.match(line):
1499                    self.in_shunt = False
1500                else:
1501                    self.str += line
1502                return True
1503
1504    def create_experiment(self, req, fid):
1505        """
1506        The external interface to experiment creation called from the
1507        dispatcher.
1508
1509        Creates a working directory, splits the incoming description using the
1510        splitter script and parses out the avrious subsections using the
1511        lcasses above.  Once each sub-experiment is created, use pooled threads
1512        to instantiate them and start it all up.
1513        """
1514
1515        if not self.auth.check_attribute(fid, 'create'):
1516            raise service_error(service_error.access, "Create access denied")
1517
1518        try:
1519            tmpdir = tempfile.mkdtemp(prefix="split-")
1520        except IOError:
1521            raise service_error(service_error.internal, "Cannot create tmp dir")
1522
1523        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1524        gw_secretkey_base = "fed.%s" % self.ssh_type
1525        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1526        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1527        tclfile = tmpdir + "/experiment.tcl"
1528        tbparams = { }
1529        try:
1530            access_user = self.accessdb[fid]
1531        except KeyError:
1532            raise service_error(service_error.internal,
1533                    "Access map and authorizer out of sync in " + \
1534                            "create_experiment for fedid %s"  % fid)
1535
1536        pid = "dummy"
1537        gid = "dummy"
1538        # XXX
1539        fail_soft = False
1540
1541        try:
1542            os.mkdir(tmpdir+"/keys")
1543        except OSError:
1544            raise service_error(service_error.internal,
1545                    "Can't make temporary dir")
1546
1547        req = req.get('CreateRequestBody', None)
1548        if not req:
1549            raise service_error(service_error.req,
1550                    "Bad request format (no CreateRequestBody)")
1551        # The tcl parser needs to read a file so put the content into that file
1552        descr=req.get('experimentdescription', None)
1553        if descr:
1554            file_content=descr.get('ns2description', None)
1555            if file_content:
1556                try:
1557                    f = open(tclfile, 'w')
1558                    f.write(file_content)
1559                    f.close()
1560                except IOError:
1561                    raise service_error(service_error.internal,
1562                            "Cannot write temp experiment description")
1563            else:
1564                raise service_error(service_error.req, 
1565                        "Only ns2descriptions supported")
1566        else:
1567            raise service_error(service_error.req, "No experiment description")
1568
1569        if req.has_key('experimentID') and \
1570                req['experimentID'].has_key('localname'):
1571            eid = req['experimentID']['localname']
1572            self.state_lock.acquire()
1573            while (self.state.has_key(eid)):
1574                eid += random.choice(string.ascii_letters)
1575            # To avoid another thread picking this localname
1576            self.state[eid] = "placeholder"
1577            self.state_lock.release()
1578        else:
1579            eid = self.exp_stem
1580            for i in range(0,5):
1581                eid += random.choice(string.ascii_letters)
1582            self.state_lock.acquire()
1583            while (self.state.has_key(eid)):
1584                eid = self.exp_stem
1585                for i in range(0,5):
1586                    eid += random.choice(string.ascii_letters)
1587            # To avoid another thread picking this localname
1588            self.state[eid] = "placeholder"
1589            self.state_lock.release()
1590
1591        try: 
1592            # This catches exceptions to clear the placeholder if necessary
1593            try:
1594                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1595            except ValueError:
1596                raise service_error(service_error.server_config, 
1597                        "Bad key type (%s)" % self.ssh_type)
1598
1599            user = req.get('user', None)
1600            if user == None:
1601                raise service_error(service_error.req, "No user")
1602
1603            master = req.get('master', None)
1604            if not master:
1605                raise service_error(service_error.req,
1606                        "No master testbed label")
1607            export_project = req.get('exportProject', None)
1608            if not export_project:
1609                raise service_error(service_error.req, "No export project")
1610           
1611            if self.splitter_url:
1612                self.log.debug("Calling remote splitter at %s" % \
1613                        self.splitter_url)
1614                split_data = self.remote_splitter(self.splitter_url,
1615                        file_content, master)
1616            else:
1617                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1618                    str(self.muxmax), '-m', master]
1619
1620                if self.fedkit:
1621                    tclcmd.append('-k')
1622
1623                if self.gatewaykit:
1624                    tclcmd.append('-K')
1625
1626                tclcmd.extend([pid, gid, eid, tclfile])
1627
1628                self.log.debug("running local splitter %s", " ".join(tclcmd))
1629                tclparser = Popen(tclcmd, stdout=PIPE)
1630                split_data = tclparser.stdout
1631
1632            allocated = { }         # Testbeds we can access
1633            started = { }           # Testbeds where a sub-experiment started
1634                                # successfully
1635
1636            # Objects to parse the splitter output (defined above)
1637            parse_current_testbed = self.current_testbed(eid, tmpdir,
1638                    self.fedkit, self.gatewaykit)
1639            parse_allbeds = self.allbeds(self.get_access)
1640            parse_gateways = self.gateways(eid, master, tmpdir,
1641                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1642                    self.fedkit)
1643            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1644                        "^#\s+End\s+Vtopo")
1645            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1646                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1647            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1648                    "^#\s+End\s+tarfiles")
1649            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1650                    "^#\s+End\s+rpms")
1651
1652            # Working on the split data
1653            for line in split_data:
1654                line = line.rstrip()
1655                if parse_current_testbed(line, master, allocated, tbparams):
1656                    continue
1657                elif parse_allbeds(line, user, tbparams, master, export_project,
1658                        access_user):
1659                    continue
1660                elif parse_gateways(line, allocated, tbparams):
1661                    continue
1662                elif parse_vtopo(line):
1663                    continue
1664                elif parse_hostnames(line):
1665                    continue
1666                elif parse_tarfiles(line):
1667                    continue
1668                elif parse_rpms(line):
1669                    continue
1670                else:
1671                    raise service_error(service_error.internal, 
1672                            "Bad tcl parse? %s" % line)
1673            # Virtual topology and visualization
1674            vtopo = self.gentopo(parse_vtopo.str)
1675            if not vtopo:
1676                raise service_error(service_error.internal, 
1677                        "Failed to generate virtual topology")
1678
1679            vis = self.genviz(vtopo)
1680            if not vis:
1681                raise service_error(service_error.internal, 
1682                        "Failed to generate visualization")
1683           
1684            # save federant information
1685            for k in allocated.keys():
1686                tbparams[k]['federant'] = {\
1687                        'name': [ { 'localname' : eid} ],\
1688                        'emulab': tbparams[k]['emulab'],\
1689                        'allocID' : tbparams[k]['allocID'],\
1690                        'master' : k == master,\
1691                    }
1692
1693
1694            # Copy tarfiles and rpms needed at remote sites into a staging area
1695            try:
1696                if self.fedkit:
1697                    for t in self.fedkit:
1698                        parse_tarfiles.list.append(t[1])
1699                if self.gatewaykit:
1700                    for t in self.gatewaykit:
1701                        parse_tarfiles.list.append(t[1])
1702                for t in parse_tarfiles.list:
1703                    if not os.path.exists("%s/tarfiles" % tmpdir):
1704                        os.mkdir("%s/tarfiles" % tmpdir)
1705                    self.copy_file(t, "%s/tarfiles/%s" % \
1706                            (tmpdir, os.path.basename(t)))
1707                for r in parse_rpms.list:
1708                    if not os.path.exists("%s/rpms" % tmpdir):
1709                        os.mkdir("%s/rpms" % tmpdir)
1710                    self.copy_file(r, "%s/rpms/%s" % \
1711                            (tmpdir, os.path.basename(r)))
1712                # A null experiment file in case we need to create a remote
1713                # experiment from scratch
1714                f = open("%s/null.tcl" % tmpdir, "w")
1715                print >>f, """
1716set ns [new Simulator]
1717source tb_compat.tcl
1718
1719set a [$ns node]
1720
1721$ns rtproto Session
1722$ns run
1723"""
1724                f.close()
1725
1726            except IOError, e:
1727                raise service_error(service_error.internal, 
1728                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1729
1730        except service_error, e:
1731            # If something goes wrong in the parse (usually an access error)
1732            # clear the placeholder state.  From here on out the code delays
1733            # exceptions.
1734            self.state_lock.acquire()
1735            del self.state[eid]
1736            self.state_lock.release()
1737            raise e
1738
1739        thread_pool = self.thread_pool(self.nthreads)
1740        threads = [ ]
1741
1742        for tb in [ k for k in allocated.keys() if k != master]:
1743            # Create and start a thread to start the segment, and save it to
1744            # get the return value later
1745            thread_pool.wait_for_slot()
1746            t  = self.pooled_thread(target=self.start_segment, 
1747                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1748                    pdata=thread_pool, trace_file=self.trace_file)
1749            threads.append(t)
1750            t.start()
1751
1752        # Wait until all finish
1753        thread_pool.wait_for_all_done()
1754
1755        # If none failed, start the master
1756        failed = [ t.getName() for t in threads if not t.rv ]
1757
1758        if len(failed) == 0:
1759            if not self.start_segment(master, eid, tbparams, tmpdir):
1760                failed.append(master)
1761
1762        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1763        # If one failed clean up, unless fail_soft is set
1764        if failed:
1765            if not fail_soft:
1766                thread_pool.clear()
1767                for tb in succeeded:
1768                    # Create and start a thread to stop the segment
1769                    thread_pool.wait_for_slot()
1770                    t  = self.pooled_thread(target=self.stop_segment, 
1771                            args=(tb, eid, tbparams), name=tb,
1772                            pdata=thread_pool, trace_file=self.trace_file)
1773                    t.start()
1774                # Wait until all finish
1775                thread_pool.wait_for_all_done()
1776
1777                # release the allocations
1778                for tb in tbparams.keys():
1779                    self.release_access(tb, tbparams[tb]['allocID'])
1780                # Remove the placeholder
1781                self.state_lock.acquire()
1782                del self.state[eid]
1783                self.state_lock.release()
1784
1785                raise service_error(service_error.federant,
1786                    "Swap in failed on %s" % ",".join(failed))
1787        else:
1788            self.log.info("[start_segment]: Experiment %s started" % eid)
1789
1790        # Generate an ID for the experiment (slice) and a certificate that the
1791        # allocator can use to prove they own it.  We'll ship it back through
1792        # the encrypted connection.
1793        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1794
1795        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1796
1797        # Walk up tmpdir, deleting as we go
1798        for path, dirs, files in os.walk(tmpdir, topdown=False):
1799            for f in files:
1800                os.remove(os.path.join(path, f))
1801            for d in dirs:
1802                os.rmdir(os.path.join(path, d))
1803        os.rmdir(tmpdir)
1804
1805        # The deepcopy prevents the allocation ID and other binaries from being
1806        # translated into other formats
1807        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1808                for tb in tbparams.keys() \
1809                    if tbparams[tb].has_key('federant') ],\
1810                    'vtopo': vtopo,\
1811                    'vis' : vis,
1812                    'experimentID' : [\
1813                            { 'fedid': copy.copy(expid) }, \
1814                            { 'localname': eid },\
1815                        ],\
1816                    'experimentAccess': { 'X509' : expcert },\
1817                }
1818        # remove the allocationID info from each federant
1819        for f in resp['federant']:
1820            if f.has_key('allocID'): del f['allocID']
1821
1822        # Insert the experiment into our state and update the disk copy
1823        self.state_lock.acquire()
1824        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1825                for tb in tbparams.keys() \
1826                    if tbparams[tb].has_key('federant') ],\
1827                    'vtopo': vtopo,\
1828                    'vis' : vis,
1829                    'owner': fid,
1830                    'experimentID' : [\
1831                            { 'fedid': expid }, { 'localname': eid },\
1832                        ],\
1833                }
1834        self.state[eid] = self.state[expid]
1835        if self.state_filename: self.write_state()
1836        self.state_lock.release()
1837
1838        self.auth.set_attribute(fid, expid)
1839        self.auth.set_attribute(expid, expid)
1840
1841        if not failed:
1842            return resp
1843        else:
1844            raise service_error(service_error.partial, \
1845                    "Partial swap in on %s" % ",".join(succeeded))
1846
1847    def check_experiment_access(self, fid, key):
1848        """
1849        Confirm that the fid has access to the experiment.  Though a request
1850        may be made in terms of a local name, the access attribute is always
1851        the experiment's fedid.
1852        """
1853        if not isinstance(key, fedid):
1854            self.state_lock.acquire()
1855            if self.state.has_key(key):
1856                if isinstance(self.state[key], dict):
1857                    try:
1858                        kl = [ f['fedid'] for f in \
1859                                self.state[key]['experimentID']\
1860                                    if f.has_key('fedid') ]
1861                    except KeyError:
1862                        self.state_lock.release()
1863                        raise service_error(service_error.internal, 
1864                                "No fedid for experiment %s when checking " +\
1865                                        "access(!?)" % key)
1866                    if len(kl) == 1:
1867                        key = kl[0]
1868                    else:
1869                        self.state_lock.release()
1870                        raise service_error(service_error.internal, 
1871                                "multiple fedids for experiment %s when " +\
1872                                        "checking access(!?)" % key)
1873                elif isinstance(self.state[key], str):
1874                    self.state_lock.release()
1875                    raise service_error(service_error.internal, 
1876                            ("experiment %s is placeholder.  " +\
1877                                    "Creation in progress or aborted oddly") \
1878                                    % key)
1879                else:
1880                    self.state_lock.release()
1881                    raise service_error(service_error.internal, 
1882                            "Unexpected state for %s" % key)
1883
1884            else:
1885                self.state_lock.release()
1886                raise service_error(service_error.access, "Access Denied")
1887            self.state_lock.release()
1888
1889        if self.auth.check_attribute(fid, key):
1890            return True
1891        else:
1892            raise service_error(service_error.access, "Access Denied")
1893
1894
1895
1896    def get_vtopo(self, req, fid):
1897        """
1898        Return the stored virtual topology for this experiment
1899        """
1900        rv = None
1901
1902        req = req.get('VtopoRequestBody', None)
1903        if not req:
1904            raise service_error(service_error.req,
1905                    "Bad request format (no VtopoRequestBody)")
1906        exp = req.get('experiment', None)
1907        if exp:
1908            if exp.has_key('fedid'):
1909                key = exp['fedid']
1910                keytype = "fedid"
1911            elif exp.has_key('localname'):
1912                key = exp['localname']
1913                keytype = "localname"
1914            else:
1915                raise service_error(service_error.req, "Unknown lookup type")
1916        else:
1917            raise service_error(service_error.req, "No request?")
1918
1919        self.check_experiment_access(fid, key)
1920
1921        self.state_lock.acquire()
1922        if self.state.has_key(key):
1923            rv = { 'experiment' : {keytype: key },\
1924                    'vtopo': self.state[key]['vtopo'],\
1925                }
1926        self.state_lock.release()
1927
1928        if rv: return rv
1929        else: raise service_error(service_error.req, "No such experiment")
1930
1931    def get_vis(self, req, fid):
1932        """
1933        Return the stored visualization for this experiment
1934        """
1935        rv = None
1936
1937        req = req.get('VisRequestBody', None)
1938        if not req:
1939            raise service_error(service_error.req,
1940                    "Bad request format (no VisRequestBody)")
1941        exp = req.get('experiment', None)
1942        if exp:
1943            if exp.has_key('fedid'):
1944                key = exp['fedid']
1945                keytype = "fedid"
1946            elif exp.has_key('localname'):
1947                key = exp['localname']
1948                keytype = "localname"
1949            else:
1950                raise service_error(service_error.req, "Unknown lookup type")
1951        else:
1952            raise service_error(service_error.req, "No request?")
1953
1954        self.check_experiment_access(fid, key)
1955
1956        self.state_lock.acquire()
1957        if self.state.has_key(key):
1958            rv =  { 'experiment' : {keytype: key },\
1959                    'vis': self.state[key]['vis'],\
1960                    }
1961        self.state_lock.release()
1962
1963        if rv: return rv
1964        else: raise service_error(service_error.req, "No such experiment")
1965
1966    def get_info(self, req, fid):
1967        """
1968        Return all the stored info about this experiment
1969        """
1970        rv = None
1971
1972        req = req.get('InfoRequestBody', None)
1973        if not req:
1974            raise service_error(service_error.req,
1975                    "Bad request format (no VisRequestBody)")
1976        exp = req.get('experiment', None)
1977        if exp:
1978            if exp.has_key('fedid'):
1979                key = exp['fedid']
1980                keytype = "fedid"
1981            elif exp.has_key('localname'):
1982                key = exp['localname']
1983                keytype = "localname"
1984            else:
1985                raise service_error(service_error.req, "Unknown lookup type")
1986        else:
1987            raise service_error(service_error.req, "No request?")
1988
1989        self.check_experiment_access(fid, key)
1990
1991        # The state may be massaged by the service function that called
1992        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1993        # state.
1994        self.state_lock.acquire()
1995        if self.state.has_key(key):
1996            rv = copy.deepcopy(self.state[key])
1997        self.state_lock.release()
1998        # Remove the owner info
1999        del rv['owner']
2000        # remove the allocationID info from each federant
2001        for f in rv['federant']:
2002            if f.has_key('allocID'): del f['allocID']
2003
2004        if rv: return rv
2005        else: raise service_error(service_error.req, "No such experiment")
2006
2007
2008    def terminate_experiment(self, req, fid):
2009        """
2010        Swap this experiment out on the federants and delete the shared
2011        information
2012        """
2013        tbparams = { }
2014        req = req.get('TerminateRequestBody', None)
2015        if not req:
2016            raise service_error(service_error.req,
2017                    "Bad request format (no TerminateRequestBody)")
2018        exp = req.get('experiment', None)
2019        if exp:
2020            if exp.has_key('fedid'):
2021                key = exp['fedid']
2022                keytype = "fedid"
2023            elif exp.has_key('localname'):
2024                key = exp['localname']
2025                keytype = "localname"
2026            else:
2027                raise service_error(service_error.req, "Unknown lookup type")
2028        else:
2029            raise service_error(service_error.req, "No request?")
2030
2031        self.check_experiment_access(fid, key)
2032
2033        self.state_lock.acquire()
2034        fed_exp = self.state.get(key, None)
2035
2036        if fed_exp:
2037            # This branch of the conditional holds the lock to generate a
2038            # consistent temporary tbparams variable to deallocate experiments.
2039            # It releases the lock to do the deallocations and reacquires it to
2040            # remove the experiment state when the termination is complete.
2041            ids = []
2042            #  experimentID is a list of dicts that are self-describing
2043            #  identifiers.  This finds all the fedids and localnames - the
2044            #  keys of self.state - and puts them into ids.
2045            for id in fed_exp.get('experimentID', []):
2046                if id.has_key('fedid'): ids.append(id['fedid'])
2047                if id.has_key('localname'): ids.append(id['localname'])
2048
2049            # Construct enough of the tbparams to make the stop_segment calls
2050            # work
2051            for fed in fed_exp['federant']:
2052                try:
2053                    for e in fed['name']:
2054                        eid = e.get('localname', None)
2055                        if eid: break
2056                    else:
2057                        continue
2058
2059                    p = fed['emulab']['project']
2060
2061                    project = p['name']['localname']
2062                    tb = p['testbed']['localname']
2063                    user = p['user'][0]['userID']['localname']
2064
2065                    domain = fed['emulab']['domain']
2066                    host  = fed['emulab']['ops']
2067                    aid = fed['allocID']
2068                except KeyError, e:
2069                    continue
2070                tbparams[tb] = {\
2071                        'user': user,\
2072                        'domain': domain,\
2073                        'project': project,\
2074                        'host': host,\
2075                        'eid': eid,\
2076                        'aid': aid,\
2077                    }
2078            self.state_lock.release()
2079
2080            # Stop everyone.
2081            thread_pool = self.thread_pool(self.nthreads)
2082            for tb in tbparams.keys():
2083                # Create and start a thread to stop the segment
2084                thread_pool.wait_for_slot()
2085                t  = self.pooled_thread(target=self.stop_segment, 
2086                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2087                        pdata=thread_pool, trace_file=self.trace_file)
2088                t.start()
2089            # Wait for completions
2090            thread_pool.wait_for_all_done()
2091
2092            # release the allocations
2093            for tb in tbparams.keys():
2094                self.release_access(tb, tbparams[tb]['aid'])
2095
2096            # Remove the terminated experiment
2097            self.state_lock.acquire()
2098            for id in ids:
2099                if self.state.has_key(id): del self.state[id]
2100
2101            if self.state_filename: self.write_state()
2102            self.state_lock.release()
2103
2104            return { 'experiment': exp }
2105        else:
2106            # Don't forget to release the lock
2107            self.state_lock.release()
2108            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.