source: fedd/federation/experiment_control.py @ b535e19

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02 version-1.01
Last change on this file since b535e19 was b535e19, checked in by Ted Faber <faber@…>, 15 years ago

Needless complexity has been made to vanish.

  • Property mode set to 100644
File size: 64.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self, nthreads):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53            self.nthreads = nthreads
54
55        def acquire(self):
56            """
57            Get the pool's lock.
58            """
59            self.changed.acquire()
60
61        def release(self):
62            """
63            Release the pool's lock.
64            """
65            self.changed.release()
66
67        def wait(self, timeout = None):
68            """
69            Wait for a pool thread to start or stop.
70            """
71            self.changed.wait(timeout)
72
73        def start(self):
74            """
75            Called by a pool thread to report starting.
76            """
77            self.changed.acquire()
78            self.started += 1
79            self.changed.notifyAll()
80            self.changed.release()
81
82        def terminate(self):
83            """
84            Called by a pool thread to report finishing.
85            """
86            self.changed.acquire()
87            self.terminated += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def clear(self):
92            """
93            Clear all pool data.
94            """
95            self.changed.acquire()
96            self.started = 0
97            self.terminated =0
98            self.changed.notifyAll()
99            self.changed.release()
100
101        def wait_for_slot(self):
102            """
103            Wait until we have a free slot to start another pooled thread
104            """
105            self.acquire()
106            while self.started - self.terminated >= self.nthreads:
107                self.wait()
108            self.release()
109
110        def wait_for_all_done(self):
111            """
112            Wait until all active threads finish (and at least one has started)
113            """
114            self.acquire()
115            while self.started == 0 or self.started > self.terminated:
116                self.wait()
117            self.release()
118
119    class pooled_thread(Thread):
120        """
121        One of a set of threads dedicated to a specific task.  Uses the
122        thread_pool class above for coordination.
123        """
124        def __init__(self, group=None, target=None, name=None, args=(), 
125                kwargs={}, pdata=None, trace_file=None):
126            Thread.__init__(self, group, target, name, args, kwargs)
127            self.rv = None          # Return value of the ops in this thread
128            self.exception = None   # Exception that terminated this thread
129            self.target=target      # Target function to run on start()
130            self.args = args        # Args to pass to target
131            self.kwargs = kwargs    # Additional kw args
132            self.pdata = pdata      # thread_pool for this class
133            # Logger for this thread
134            self.log = logging.getLogger("fedd.experiment_control")
135       
136        def run(self):
137            """
138            Emulate Thread.run, except add pool data manipulation and error
139            logging.
140            """
141            if self.pdata:
142                self.pdata.start()
143
144            if self.target:
145                try:
146                    self.rv = self.target(*self.args, **self.kwargs)
147                except service_error, s:
148                    self.exception = s
149                    self.log.error("Thread exception: %s %s" % \
150                            (s.code_string(), s.desc))
151                except:
152                    self.exception = sys.exc_info()[1]
153                    self.log.error(("Unexpected thread exception: %s" +\
154                            "Trace %s") % (self.exception,\
155                                traceback.format_exc()))
156            if self.pdata:
157                self.pdata.terminate()
158
159    call_RequestAccess = service_caller('RequestAccess')
160    call_ReleaseAccess = service_caller('ReleaseAccess')
161    call_Ns2Split = service_caller('Ns2Split')
162
163    def __init__(self, config=None, auth=None):
164        """
165        Intialize the various attributes, most from the config object
166        """
167
168        def parse_tarfile_list(tf):
169            """
170            Parse a tarfile list from the configuration.  This is a set of
171            paths and tarfiles separated by spaces.
172            """
173            rv = [ ]
174            if tf is not None:
175                tl = tf.split()
176                while len(tl) > 1:
177                    p, t = tl[0:2]
178                    del tl[0:2]
179                    rv.append((p, t))
180            return rv
181
182        self.thread_with_rv = experiment_control_local.pooled_thread
183        self.thread_pool = experiment_control_local.thread_pool
184
185        self.cert_file = config.get("experiment_control", "cert_file")
186        if self.cert_file:
187            self.cert_pwd = config.get("experiment_control", "cert_pwd")
188        else:
189            self.cert_file = config.get("globals", "cert_file")
190            self.cert_pwd = config.get("globals", "cert_pwd")
191
192        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
193                or config.get("globals", "trusted_certs")
194
195        self.exp_stem = "fed-stem"
196        self.log = logging.getLogger("fedd.experiment_control")
197        set_log_level(config, "experiment_control", self.log)
198        self.muxmax = 2
199        self.nthreads = 2
200        self.randomize_experiments = False
201
202        self.scp_exec = "/usr/bin/scp"
203        self.splitter = None
204        self.ssh_exec="/usr/bin/ssh"
205        self.ssh_keygen = "/usr/bin/ssh-keygen"
206        self.ssh_identity_file = None
207
208
209        self.debug = config.getboolean("experiment_control", "create_debug")
210        self.state_filename = config.get("experiment_control", 
211                "experiment_state")
212        self.splitter_url = config.get("experiment_control", "splitter_uri")
213        self.fedkit = parse_tarfile_list(\
214                config.get("experiment_control", "fedkit"))
215        self.gatewaykit = parse_tarfile_list(\
216                config.get("experiment_control", "gatewaykit"))
217        accessdb_file = config.get("experiment_control", "accessdb")
218
219        self.ssh_pubkey_file = config.get("experiment_control", 
220                "ssh_pubkey_file")
221        self.ssh_privkey_file = config.get("experiment_control",
222                "ssh_privkey_file")
223        # NB for internal master/slave ops, not experiment setup
224        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
225        self.state = { }
226        self.state_lock = Lock()
227        self.tclsh = "/usr/local/bin/otclsh"
228        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
229                config.get("experiment_control", "tcl_splitter",
230                        "/usr/testbed/lib/ns2ir/parse.tcl")
231        mapdb_file = config.get("experiment_control", "mapdb")
232        self.trace_file = sys.stderr
233
234        self.def_expstart = \
235                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
236                "/tmp/federate";
237        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
238                "FEDDIR/hosts";
239        self.def_gwstart = \
240                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
241                "/tmp/bridge.log";
242        self.def_mgwstart = \
243                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
244                "/tmp/bridge.log";
245        self.def_gwimage = "FBSD61-TUNNEL2";
246        self.def_gwtype = "pc";
247        self.local_access = { }
248
249        if auth:
250            self.auth = auth
251        else:
252            self.log.error(\
253                    "[access]: No authorizer initialized, creating local one.")
254            auth = authorizer()
255
256
257        if self.ssh_pubkey_file:
258            try:
259                f = open(self.ssh_pubkey_file, 'r')
260                self.ssh_pubkey = f.read()
261                f.close()
262            except IOError:
263                raise service_error(service_error.internal,
264                        "Cannot read sshpubkey")
265        else:
266            raise service_error(service_error.internal, 
267                    "No SSH public key file?")
268
269        if not self.ssh_privkey_file:
270            raise service_error(service_error.internal, 
271                    "No SSH public key file?")
272
273
274        if mapdb_file:
275            self.read_mapdb(mapdb_file)
276        else:
277            self.log.warn("[experiment_control] No testbed map, using defaults")
278            self.tbmap = { 
279                    'deter':'https://users.isi.deterlab.net:23235',
280                    'emulab':'https://users.isi.deterlab.net:23236',
281                    'ucb':'https://users.isi.deterlab.net:23237',
282                    }
283
284        if accessdb_file:
285                self.read_accessdb(accessdb_file)
286        else:
287            raise service_error(service_error.internal,
288                    "No accessdb specified in config")
289
290        # Grab saved state.  OK to do this w/o locking because it's read only
291        # and only one thread should be in existence that can see self.state at
292        # this point.
293        if self.state_filename:
294            self.read_state()
295
296        # Dispatch tables
297        self.soap_services = {\
298                'Create': soap_handler('Create', self.create_experiment),
299                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
300                'Vis': soap_handler('Vis', self.get_vis),
301                'Info': soap_handler('Info', self.get_info),
302                'Terminate': soap_handler('Terminate', 
303                    self.terminate_experiment),
304        }
305
306        self.xmlrpc_services = {\
307                'Create': xmlrpc_handler('Create', self.create_experiment),
308                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
309                'Vis': xmlrpc_handler('Vis', self.get_vis),
310                'Info': xmlrpc_handler('Info', self.get_info),
311                'Terminate': xmlrpc_handler('Terminate',
312                    self.terminate_experiment),
313        }
314
315    def copy_file(self, src, dest, size=1024):
316        """
317        Exceedingly simple file copy.
318        """
319        s = open(src,'r')
320        d = open(dest, 'w')
321
322        buf = "x"
323        while buf != "":
324            buf = s.read(size)
325            d.write(buf)
326        s.close()
327        d.close()
328
329    # Call while holding self.state_lock
330    def write_state(self):
331        """
332        Write a new copy of experiment state after copying the existing state
333        to a backup.
334
335        State format is a simple pickling of the state dictionary.
336        """
337        if os.access(self.state_filename, os.W_OK):
338            self.copy_file(self.state_filename, \
339                    "%s.bak" % self.state_filename)
340        try:
341            f = open(self.state_filename, 'w')
342            pickle.dump(self.state, f)
343        except IOError, e:
344            self.log.error("Can't write file %s: %s" % \
345                    (self.state_filename, e))
346        except pickle.PicklingError, e:
347            self.log.error("Pickling problem: %s" % e)
348        except TypeError, e:
349            self.log.error("Pickling problem (TypeError): %s" % e)
350
351    # Call while holding self.state_lock
352    def read_state(self):
353        """
354        Read a new copy of experiment state.  Old state is overwritten.
355
356        State format is a simple pickling of the state dictionary.
357        """
358        try:
359            f = open(self.state_filename, "r")
360            self.state = pickle.load(f)
361            self.log.debug("[read_state]: Read state from %s" % \
362                    self.state_filename)
363        except IOError, e:
364            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
365                    % (self.state_filename, e))
366        except pickle.UnpicklingError, e:
367            self.log.warning(("[read_state]: No saved state: " + \
368                    "Unpickling failed: %s") % e)
369       
370        for k in self.state.keys():
371            try:
372                # This list should only have one element in it, but phrasing it
373                # as a for loop doesn't cost much, really.  We have to find the
374                # fedid elements anyway.
375                for eid in [ f['fedid'] \
376                        for f in self.state[k]['experimentID']\
377                            if f.has_key('fedid') ]:
378                    self.auth.set_attribute(self.state[k]['owner'], eid)
379            except KeyError, e:
380                self.log.warning("[read_state]: State ownership or identity " +\
381                        "misformatted in %s: %s" % (self.state_filename, e))
382
383
384    def read_accessdb(self, accessdb_file):
385        """
386        Read the mapping from fedids that can create experiments to their name
387        in the 3-level access namespace.  All will be asserted from this
388        testbed and can include the local username and porject that will be
389        asserted on their behalf by this fedd.  Each fedid is also added to the
390        authorization system with the "create" attribute.
391        """
392        self.accessdb = {}
393        # These are the regexps for parsing the db
394        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
395        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
396                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
397        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
398                "\s*->\s*(" + name_expr + ")\s*$")
399        lineno = 0
400
401        # Parse the mappings and store in self.authdb, a dict of
402        # fedid -> (proj, user)
403        try:
404            f = open(accessdb_file, "r")
405            for line in f:
406                lineno += 1
407                line = line.strip()
408                if len(line) == 0 or line.startswith('#'):
409                    continue
410                m = project_line.match(line)
411                if m:
412                    fid = fedid(hexstr=m.group(1))
413                    project, user = m.group(2,3)
414                    if not self.accessdb.has_key(fid):
415                        self.accessdb[fid] = []
416                    self.accessdb[fid].append((project, user))
417                    continue
418
419                m = user_line.match(line)
420                if m:
421                    fid = fedid(hexstr=m.group(1))
422                    project = None
423                    user = m.group(2)
424                    if not self.accessdb.has_key(fid):
425                        self.accessdb[fid] = []
426                    self.accessdb[fid].append((project, user))
427                    continue
428                self.log.warn("[experiment_control] Error parsing access " +\
429                        "db %s at line %d" %  (accessdb_file, lineno))
430        except IOError:
431            raise service_error(service_error.internal,
432                    "Error opening/reading %s as experiment " +\
433                            "control accessdb" %  accessdb_file)
434        f.close()
435
436        # Initialize the authorization attributes
437        for fid in self.accessdb.keys():
438            self.auth.set_attribute(fid, 'create')
439
440    def read_mapdb(self, file):
441        """
442        Read a simple colon separated list of mappings for the
443        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
444        """
445
446        self.tbmap = { }
447        lineno =0
448        try:
449            f = open(file, "r")
450            for line in f:
451                lineno += 1
452                line = line.strip()
453                if line.startswith('#') or len(line) == 0:
454                    continue
455                try:
456                    label, url = line.split(':', 1)
457                    self.tbmap[label] = url
458                except ValueError, e:
459                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
460                            "map db: %s %s" % (lineno, line, e))
461        except IOError, e:
462            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
463                    "open %s: %s" % (file, e))
464        f.close()
465
466    def scp_file(self, file, user, host, dest=""):
467        """
468        scp a file to the remote host.  If debug is set the action is only
469        logged.
470        """
471
472        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
473                '-o', 'StrictHostKeyChecking yes', '-i', 
474                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
475        rv = 0
476
477        try:
478            dnull = open("/dev/null", "w")
479        except IOError:
480            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
481            dnull = Null
482
483        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
484        if not self.debug:
485            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
486
487        return rv == 0
488
489    def ssh_cmd(self, user, host, cmd, wname=None):
490        """
491        Run a remote command on host as user.  If debug is set, the action is
492        only logged.
493        """
494        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
495                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
496                (self.ssh_exec, self.ssh_privkey_file, 
497                        user, host, cmd)
498
499        try:
500            dnull = open("/dev/null", "r")
501        except IOError:
502            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
503            dnull = Null
504
505        self.log.debug("[ssh_cmd]: %s" % sh_str)
506        if not self.debug:
507            if dnull:
508                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
509            else:
510                sub = Popen(sh_str, shell=True)
511            return sub.wait() == 0
512        else:
513            return True
514
515    def ship_configs(self, host, user, src_dir, dest_dir):
516        """
517        Copy federant-specific configuration files to the federant.
518        """
519        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
520            return False
521        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
522            return False
523
524        for f in os.listdir(src_dir):
525            if os.path.isdir(f):
526                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
527                        "%s/%s" % (dest_dir, f)):
528                    return False
529            else:
530                if not self.scp_file("%s/%s" % (src_dir, f), 
531                        user, host, dest_dir):
532                    return False
533        return True
534
535    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
536        """
537        Start a sub-experiment on a federant.
538
539        Get the current state, modify or create as appropriate, ship data and
540        configs and start the experiment.  There are small ordering differences
541        based on the initial state of the sub-experiment.
542        """
543        # ops node in the federant
544        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
545        user = tbparams[tb]['user']     # federant user
546        pid = tbparams[tb]['project']   # federant project
547        # XXX
548        base_confs = ( "hosts",)
549        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
550        # command to test experiment state
551        expinfo_exec = "/usr/testbed/bin/expinfo" 
552        # Configuration directories on the remote machine
553        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
554        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
555        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
556        # Regular expressions to parse the expinfo response
557        state_re = re.compile("State:\s+(\w+)")
558        no_exp_re = re.compile("^No\s+such\s+experiment")
559        state = None    # Experiment state parsed from expinfo
560        # The expinfo ssh command.  Note the identity restriction to use only
561        # the identity provided in the pubkey given.
562        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
563                'StrictHostKeyChecking yes', '-i', 
564                self.ssh_privkey_file, "%s@%s" % (user, host), 
565                expinfo_exec, pid, eid]
566
567        # Get status
568        self.log.debug("[start_segment]: %s"% " ".join(cmd))
569        dev_null = None
570        try:
571            dev_null = open("/dev/null", "a")
572        except IOError, e:
573            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
574
575        if self.debug:
576            state = 'swapped'
577            rv = 0
578        else:
579            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
580            for line in status.stdout:
581                m = state_re.match(line)
582                if m: state = m.group(1)
583                else:
584                    m = no_exp_re.match(line)
585                    if m: state = "none"
586            rv = status.wait()
587
588        # If the experiment is not present the subcommand returns a non-zero
589        # return value.  If we successfully parsed a "none" outcome, ignore the
590        # return code.
591        if rv != 0 and state != "none":
592            raise service_error(service_error.internal,
593                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
594
595        self.log.debug("[start_segment]: %s: %s" % (tb, state))
596        self.log.info("[start_segment]:transferring experiment to %s" % tb)
597
598        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
599            return False
600        # Clear the federation config dirs
601        if not self.ssh_cmd(user, host, 
602                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
603            return False
604        # Clear and create the tarfiles and rpm directories
605        for d in (tarfiles_dir, rpms_dir):
606            if not self.ssh_cmd(user, host, 
607                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
608                return False
609            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
610                    "create tarfiles"):
611                return False
612       
613        if state == 'active':
614            # Create the federation config dirs (do not move outside the
615            # conditional.  Happens later in new expriment creation)
616            if not self.ssh_cmd(user, host, 
617                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
618                return False
619            # Remote experiment is active.  Modify it.
620            for f in base_confs:
621                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
622                        "%s/%s" % (proj_dir, f)):
623                    return False
624            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
625                    proj_dir):
626                return False
627            if os.path.isdir("%s/tarfiles" % tmpdir):
628                if not self.ship_configs(host, user,
629                        "%s/tarfiles" % tmpdir, tarfiles_dir):
630                    return False
631            if os.path.isdir("%s/rpms" % tmpdir):
632                if not self.ship_configs(host, user,
633                        "%s/rpms" % tmpdir, tarfiles_dir):
634                    return False
635            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
636            if not self.ssh_cmd(user, host,
637                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
638                            (pid, eid, tclfile), "modexp"):
639                return False
640            return True
641        elif state == "swapped":
642            # Create the federation config dirs (do not move outside the
643            # conditional.  Happens later in new expriment creation)
644            if not self.ssh_cmd(user, host, 
645                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
646                return False
647            # Remote experiment swapped out.  Modify it and swap it in.
648            for f in base_confs:
649                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
650                        "%s/%s" % (proj_dir, f)):
651                    return False
652            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
653                    proj_dir):
654                return False
655            if os.path.isdir("%s/tarfiles" % tmpdir):
656                if not self.ship_configs(host, user,
657                        "%s/tarfiles" % tmpdir, tarfiles_dir):
658                    return False
659            if os.path.isdir("%s/rpms" % tmpdir):
660                if not self.ship_configs(host, user,
661                        "%s/rpms" % tmpdir, tarfiles_dir):
662                    return False
663            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
664            if not self.ssh_cmd(user, host,
665                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
666                    "modexp"):
667                return False
668            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
669            if not self.ssh_cmd(user, host,
670                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
671                    "swapexp"):
672                return False
673            return True
674        elif state == "none":
675            # No remote experiment.  Create one.  We do this in 2 steps so we
676            # can put the configuration files and scripts into the new
677            # experiment directories.
678
679            # Tarfiles must be present for creation to work
680            if os.path.isdir("%s/tarfiles" % tmpdir):
681                if not self.ship_configs(host, user,
682                        "%s/tarfiles" % tmpdir, tarfiles_dir):
683                    return False
684            if os.path.isdir("%s/rpms" % tmpdir):
685                if not self.ship_configs(host, user,
686                        "%s/rpms" % tmpdir, tarfiles_dir):
687                    return False
688            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
689            if not self.ssh_cmd(user, host,
690                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
691                            (pid, eid, tclfile), "startexp"):
692                return False
693            # Create the federation config dirs (do not move outside the
694            # conditional.)
695            if not self.ssh_cmd(user, host, 
696                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
697                return False
698            # After startexp the per-experiment directories exist
699            for f in base_confs:
700                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
701                        "%s/%s" % (proj_dir, f)):
702                    return False
703            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
704                    proj_dir):
705                return False
706            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
707            if not self.ssh_cmd(user, host,
708                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
709                    "swapexp"):
710                return False
711            return True
712        else:
713            self.log.debug("[start_segment]:unknown state %s" % state)
714            return False
715
716    def stop_segment(self, tb, eid, tbparams):
717        """
718        Stop a sub experiment by calling swapexp on the federant
719        """
720        user = tbparams[tb]['user']
721        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
722        pid = tbparams[tb]['project']
723
724        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
725        return self.ssh_cmd(user, host,
726                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
727
728       
729    def generate_ssh_keys(self, dest, type="rsa" ):
730        """
731        Generate a set of keys for the gateways to use to talk.
732
733        Keys are of type type and are stored in the required dest file.
734        """
735        valid_types = ("rsa", "dsa")
736        t = type.lower();
737        if t not in valid_types: raise ValueError
738        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
739
740        try:
741            trace = open("/dev/null", "w")
742        except IOError:
743            raise service_error(service_error.internal,
744                    "Cannot open /dev/null??");
745
746        # May raise CalledProcessError
747        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
748        rv = call(cmd, stdout=trace, stderr=trace)
749        if rv != 0:
750            raise service_error(service_error.internal, 
751                    "Cannot generate nonce ssh keys.  %s return code %d" \
752                            % (self.ssh_keygen, rv))
753
754    def gentopo(self, str):
755        """
756        Generate the topology dtat structure from the splitter's XML
757        representation of it.
758
759        The topology XML looks like:
760            <experiment>
761                <nodes>
762                    <node><vname></vname><ips>ip1:ip2</ips></node>
763                </nodes>
764                <lans>
765                    <lan>
766                        <vname></vname><vnode></vnode><ip></ip>
767                        <bandwidth></bandwidth><member>node:port</member>
768                    </lan>
769                </lans>
770        """
771        class topo_parse:
772            """
773            Parse the topology XML and create the dats structure.
774            """
775            def __init__(self):
776                # Typing of the subelements for data conversion
777                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
778                self.int_subelements = ( 'bandwidth',)
779                self.float_subelements = ( 'delay',)
780                # The final data structure
781                self.nodes = [ ]
782                self.lans =  [ ]
783                self.topo = { \
784                        'node': self.nodes,\
785                        'lan' : self.lans,\
786                    }
787                self.element = { }  # Current element being created
788                self.chars = ""     # Last text seen
789
790            def end_element(self, name):
791                # After each sub element the contents is added to the current
792                # element or to the appropriate list.
793                if name == 'node':
794                    self.nodes.append(self.element)
795                    self.element = { }
796                elif name == 'lan':
797                    self.lans.append(self.element)
798                    self.element = { }
799                elif name in self.str_subelements:
800                    self.element[name] = self.chars
801                    self.chars = ""
802                elif name in self.int_subelements:
803                    self.element[name] = int(self.chars)
804                    self.chars = ""
805                elif name in self.float_subelements:
806                    self.element[name] = float(self.chars)
807                    self.chars = ""
808
809            def found_chars(self, data):
810                self.chars += data.rstrip()
811
812
813        tp = topo_parse();
814        parser = xml.parsers.expat.ParserCreate()
815        parser.EndElementHandler = tp.end_element
816        parser.CharacterDataHandler = tp.found_chars
817
818        parser.Parse(str)
819
820        return tp.topo
821       
822
823    def genviz(self, topo):
824        """
825        Generate the visualization the virtual topology
826        """
827
828        neato = "/usr/local/bin/neato"
829        # These are used to parse neato output and to create the visualization
830        # file.
831        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
832        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
833                "%s</type></node>"
834
835        try:
836            # Node names
837            nodes = [ n['vname'] for n in topo['node'] ]
838            topo_lans = topo['lan']
839        except KeyError:
840            raise service_error(service_error.internal, "Bad topology")
841
842        lans = { }
843        links = { }
844
845        # Walk through the virtual topology, organizing the connections into
846        # 2-node connections (links) and more-than-2-node connections (lans).
847        # When a lan is created, it's added to the list of nodes (there's a
848        # node in the visualization for the lan).
849        for l in topo_lans:
850            if links.has_key(l['vname']):
851                if len(links[l['vname']]) < 2:
852                    links[l['vname']].append(l['vnode'])
853                else:
854                    nodes.append(l['vname'])
855                    lans[l['vname']] = links[l['vname']]
856                    del links[l['vname']]
857                    lans[l['vname']].append(l['vnode'])
858            elif lans.has_key(l['vname']):
859                lans[l['vname']].append(l['vnode'])
860            else:
861                links[l['vname']] = [ l['vnode'] ]
862
863
864        # Open up a temporary file for dot to turn into a visualization
865        try:
866            df, dotname = tempfile.mkstemp()
867            dotfile = os.fdopen(df, 'w')
868        except IOError:
869            raise service_error(service_error.internal,
870                    "Failed to open file in genviz")
871
872        # Generate a dot/neato input file from the links, nodes and lans
873        try:
874            print >>dotfile, "graph G {"
875            for n in nodes:
876                print >>dotfile, '\t"%s"' % n
877            for l in links.keys():
878                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
879            for l in lans.keys():
880                for n in lans[l]:
881                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
882            print >>dotfile, "}"
883            dotfile.close()
884        except TypeError:
885            raise service_error(service_error.internal,
886                    "Single endpoint link in vtopo")
887        except IOError:
888            raise service_error(service_error.internal, "Cannot write dot file")
889
890        # Use dot to create a visualization
891        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
892                '-Gpack=true', dotname], stdout=PIPE)
893
894        # Translate dot to vis format
895        vis_nodes = [ ]
896        vis = { 'node': vis_nodes }
897        for line in dot.stdout:
898            m = vis_re.match(line)
899            if m:
900                vn = m.group(1)
901                vis_node = {'name': vn, \
902                        'x': float(m.group(2)),\
903                        'y' : float(m.group(3)),\
904                    }
905                if vn in links.keys() or vn in lans.keys():
906                    vis_node['type'] = 'lan'
907                else:
908                    vis_node['type'] = 'node'
909                vis_nodes.append(vis_node)
910        rv = dot.wait()
911
912        os.remove(dotname)
913        if rv == 0 : return vis
914        else: return None
915
916    def get_access(self, tb, nodes, user, tbparam, master, export_project,
917            access_user):
918        """
919        Get access to testbed through fedd and set the parameters for that tb
920        """
921        uri = self.tbmap.get(tb, None)
922        if not uri:
923            raise service_error(serice_error.server_config, 
924                    "Unknown testbed: %s" % tb)
925
926        # currently this lumps all users into one service access group
927        service_keys = [ a for u in user \
928                for a in u.get('access', []) \
929                    if a.has_key('sshPubkey')]
930
931        if len(service_keys) == 0:
932            raise service_error(service_error.req, 
933                    "Must have at least one SSH pubkey for services")
934
935
936        for p, u in access_user:
937            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
938                    "to %s") %  ((p or "None"), u, uri))
939
940            if p:
941                # Request with user and project specified
942                req = {\
943                        'destinationTestbed' : { 'uri' : uri },
944                        'project': { 
945                            'name': {'localname': p},
946                            'user': [ {'userID': { 'localname': u } } ],
947                            },
948                        'user':  user,
949                        'allocID' : { 'localname': 'test' },
950                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
951                        'serviceAccess' : service_keys
952                    }
953            else:
954                # Request with only user specified
955                req = {\
956                        'destinationTestbed' : { 'uri' : uri },
957                        'user':  [ {'userID': { 'localname': u } } ],
958                        'allocID' : { 'localname': 'test' },
959                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
960                        'serviceAccess' : service_keys
961                    }
962
963            if tb == master:
964                # NB, the export_project parameter is a dict that includes
965                # the type
966                req['exportProject'] = export_project
967
968            # node resources if any
969            if nodes != None and len(nodes) > 0:
970                rnodes = [ ]
971                for n in nodes:
972                    rn = { }
973                    image, hw, count = n.split(":")
974                    if image: rn['image'] = [ image ]
975                    if hw: rn['hardware'] = [ hw ]
976                    if count and int(count) >0 : rn['count'] = int(count)
977                    rnodes.append(rn)
978                req['resources']= { }
979                req['resources']['node'] = rnodes
980
981            try:
982                if self.local_access.has_key(uri):
983                    # Local access call
984                    req = { 'RequestAccessRequestBody' : req }
985                    r = self.local_access[uri].RequestAccess(req, 
986                            fedid(file=self.cert_file))
987                    r = { 'RequestAccessResponseBody' : r }
988                else:
989                    r = self.call_RequestAccess(uri, req, 
990                            self.cert_file, self.cert_pwd, self.trusted_certs)
991            except service_error, e:
992                if e.code == service_error.access:
993                    self.log.debug("[get_access] Access denied")
994                    r = None
995                    continue
996                else:
997                    raise e
998
999            if r.has_key('RequestAccessResponseBody'):
1000                # Through to here we have a valid response, not a fault.
1001                # Access denied is a fault, so something better or worse than
1002                # access denied has happened.
1003                r = r['RequestAccessResponseBody']
1004                self.log.debug("[get_access] Access granted")
1005                break
1006            else:
1007                raise service_error(service_error.protocol,
1008                        "Bad proxy response")
1009       
1010        if not r:
1011            raise service_error(service_error.access, 
1012                    "Access denied by %s (%s)" % (tb, uri))
1013
1014        e = r['emulab']
1015        p = e['project']
1016        tbparam[tb] = { 
1017                "boss": e['boss'],
1018                "host": e['ops'],
1019                "domain": e['domain'],
1020                "fs": e['fileServer'],
1021                "eventserver": e['eventServer'],
1022                "project": unpack_id(p['name']),
1023                "emulab" : e,
1024                "allocID" : r['allocID'],
1025                }
1026        # Make the testbed name be the label the user applied
1027        p['testbed'] = {'localname': tb }
1028
1029        for u in p['user']:
1030            role = u.get('role', None)
1031            if role == 'experimentCreation':
1032                tbparam[tb]['user'] = unpack_id(u['userID'])
1033                break
1034        else:
1035            raise service_error(service_error.internal, 
1036                    "No createExperimentUser from %s" %tb)
1037
1038        # Add attributes to barameter space.  We don't allow attributes to
1039        # overlay any parameters already installed.
1040        for a in e['fedAttr']:
1041            try:
1042                if a['attribute'] and isinstance(a['attribute'], basestring)\
1043                        and not tbparam[tb].has_key(a['attribute'].lower()):
1044                    tbparam[tb][a['attribute'].lower()] = a['value']
1045            except KeyError:
1046                self.log.error("Bad attribute in response: %s" % a)
1047       
1048    def release_access(self, tb, aid):
1049        """
1050        Release access to testbed through fedd
1051        """
1052
1053        uri = self.tbmap.get(tb, None)
1054        if not uri:
1055            raise service_error(serice_error.server_config, 
1056                    "Unknown testbed: %s" % tb)
1057
1058        if self.local_access.has_key(uri):
1059            resp = self.local_access[uri].ReleaseAccess(\
1060                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1061                    fedid(file=self.cert_file))
1062            resp = { 'ReleaseAccessResponseBody': resp } 
1063        else:
1064            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1065                    self.cert_file, self.cert_pwd, self.trusted_certs)
1066
1067        # better error coding
1068
1069    def remote_splitter(self, uri, desc, master):
1070
1071        req = {
1072                'description' : { 'ns2description': desc },
1073                'master': master,
1074                'include_fedkit': bool(self.fedkit),
1075                'include_gatewaykit': bool(self.gatewaykit)
1076            }
1077
1078        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1079                self.trusted_certs)
1080
1081        if r.has_key('Ns2SplitResponseBody'):
1082            r = r['Ns2SplitResponseBody']
1083            if r.has_key('output'):
1084                return r['output'].splitlines()
1085            else:
1086                raise service_error(service_error.protocol, 
1087                        "Bad splitter response (no output)")
1088        else:
1089            raise service_error(service_error.protocol, "Bad splitter response")
1090       
1091    class current_testbed:
1092        """
1093        Object for collecting the current testbed description.  The testbed
1094        description is saved to a file with the local testbed variables
1095        subsittuted line by line.
1096        """
1097        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1098            def tar_list_to_string(tl):
1099                if tl is None: return None
1100
1101                rv = ""
1102                for t in tl:
1103                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1104                            (t[0], os.path.basename(t[1]))
1105                return rv
1106
1107
1108            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1109            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1110            self.current_testbed = None
1111            self.testbed_file = None
1112
1113            self.def_expstart = \
1114                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1115            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1116            self.def_gwstart = \
1117                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1118            self.def_mgwstart = \
1119                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1120            self.def_gwimage = "FBSD61-TUNNEL2";
1121            self.def_gwtype = "pc";
1122            self.def_mgwcmd = '# '
1123            self.def_mgwcmdparams = ''
1124            self.def_gwcmd = '# '
1125            self.def_gwcmdparams = ''
1126
1127            self.eid = eid
1128            self.tmpdir = tmpdir
1129            # Convert fedkit and gateway kit (which are lists of tuples) into a
1130            # substituition string.
1131            self.fedkit = tar_list_to_string(fedkit)
1132            self.gatewaykit = tar_list_to_string(gatewaykit)
1133
1134        def __call__(self, line, master, allocated, tbparams):
1135            # Capture testbed topology descriptions
1136            if self.current_testbed == None:
1137                m = self.begin_testbed.match(line)
1138                if m != None:
1139                    self.current_testbed = m.group(1)
1140                    if self.current_testbed == None:
1141                        raise service_error(service_error.req,
1142                                "Bad request format (unnamed testbed)")
1143                    allocated[self.current_testbed] = \
1144                            allocated.get(self.current_testbed,0) + 1
1145                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1146                    if not os.path.exists(tb_dir):
1147                        try:
1148                            os.mkdir(tb_dir)
1149                        except IOError:
1150                            raise service_error(service_error.internal,
1151                                    "Cannot create %s" % tb_dir)
1152                    try:
1153                        self.testbed_file = open("%s/%s.%s.tcl" %
1154                                (tb_dir, self.eid, self.current_testbed), 'w')
1155                    except IOError:
1156                        self.testbed_file = None
1157                    return True
1158                else: return False
1159            else:
1160                m = self.end_testbed.match(line)
1161                if m != None:
1162                    if m.group(1) != self.current_testbed:
1163                        raise service_error(service_error.internal, 
1164                                "Mismatched testbed markers!?")
1165                    if self.testbed_file != None: 
1166                        self.testbed_file.close()
1167                        self.testbed_file = None
1168                    self.current_testbed = None
1169                elif self.testbed_file:
1170                    # Substitute variables and put the line into the local
1171                    # testbed file.
1172                    gwtype = tbparams[self.current_testbed].get(\
1173                            'connectortype', self.def_gwtype)
1174                    gwimage = tbparams[self.current_testbed].get(\
1175                            'connectorimage', self.def_gwimage)
1176                    mgwstart = tbparams[self.current_testbed].get(\
1177                            'masterconnectorstartcmd', self.def_mgwstart)
1178                    mexpstart = tbparams[self.current_testbed].get(\
1179                            'masternodestartcmd', self.def_mexpstart)
1180                    gwstart = tbparams[self.current_testbed].get(\
1181                            'slaveconnectorstartcmd', self.def_gwstart)
1182                    expstart = tbparams[self.current_testbed].get(\
1183                            'slavenodestartcmd', self.def_expstart)
1184                    project = tbparams[self.current_testbed].get('project')
1185                    gwcmd = tbparams[self.current_testbed].get(\
1186                            'slaveconnectorcmd', self.def_gwcmd)
1187                    gwcmdparams = tbparams[self.current_testbed].get(\
1188                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1189                    mgwcmd = tbparams[self.current_testbed].get(\
1190                            'masterconnectorcmd', self.def_gwcmd)
1191                    mgwcmdparams = tbparams[self.current_testbed].get(\
1192                            'masterconnectorcmdparams', self.def_gwcmdparams)
1193                    line = re.sub("GWTYPE", gwtype, line)
1194                    line = re.sub("GWIMAGE", gwimage, line)
1195                    if self.current_testbed == master:
1196                        line = re.sub("GWSTART", mgwstart, line)
1197                        line = re.sub("EXPSTART", mexpstart, line)
1198                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1199                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1200                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1201                    else:
1202                        line = re.sub("GWSTART", gwstart, line)
1203                        line = re.sub("EXPSTART", expstart, line)
1204                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1205                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1206                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1207                    #These expansions contain EID and PROJDIR.  NB these are
1208                    # local fedkit and gatewaykit, which are strings.
1209                    if self.fedkit:
1210                        line = re.sub("FEDKIT", self.fedkit, line)
1211                    if self.gatewaykit:
1212                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1213                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1214                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1215                    line = re.sub("EID", self.eid, line)
1216                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1217                            (project, self.eid), line)
1218                    print >>self.testbed_file, line
1219                return True
1220
1221    class allbeds:
1222        """
1223        Process the Allbeds section.  Get access to each federant and save the
1224        parameters in tbparams
1225        """
1226        def __init__(self, get_access):
1227            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1228            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1229            self.in_allbeds = False
1230            self.get_access = get_access
1231
1232        def __call__(self, line, user, tbparams, master, export_project,
1233                access_user):
1234            # Testbed access parameters
1235            if not self.in_allbeds:
1236                if self.begin_allbeds.match(line):
1237                    self.in_allbeds = True
1238                    return True
1239                else:
1240                    return False
1241            else:
1242                if self.end_allbeds.match(line):
1243                    self.in_allbeds = False
1244                else:
1245                    nodes = line.split('|')
1246                    tb = nodes.pop(0)
1247                    self.get_access(tb, nodes, user, tbparams, master,
1248                            export_project, access_user)
1249                return True
1250
1251    class gateways:
1252        def __init__(self, eid, master, tmpdir, gw_pubkey,
1253                gw_secretkey, copy_file, fedkit):
1254            self.begin_gateways = \
1255                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1256            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1257            self.current_gateways = None
1258            self.control_gateway = None
1259            self.active_end = { }
1260
1261            self.eid = eid
1262            self.master = master
1263            self.tmpdir = tmpdir
1264            self.gw_pubkey_base = gw_pubkey
1265            self.gw_secretkey_base = gw_secretkey
1266
1267            self.copy_file = copy_file
1268            self.fedkit = fedkit
1269
1270
1271        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1272                active_end, tbparams, dtb, myname, desthost, type):
1273            """
1274            Produce a gateway configuration file from a gateways line.
1275            """
1276
1277            sproject = tbparams[gw].get('project', 'project')
1278            dproject = tbparams[dtb].get('project', 'project')
1279            sdomain = ".%s.%s%s" % (eid, sproject,
1280                    tbparams[gw].get('domain', ".example.com"))
1281            ddomain = ".%s.%s%s" % (eid, dproject,
1282                    tbparams[dtb].get('domain', ".example.com"))
1283            boss = tbparams[master].get('boss', "boss")
1284            fs = tbparams[master].get('fs', "fs")
1285            event_server = "%s%s" % \
1286                    (tbparams[gw].get('eventserver', "event_server"),
1287                            tbparams[gw].get('domain', "example.com"))
1288            remote_event_server = "%s%s" % \
1289                    (tbparams[dtb].get('eventserver', "event_server"),
1290                            tbparams[dtb].get('domain', "example.com"))
1291            seer_control = "%s%s" % \
1292                    (tbparams[gw].get('control', "control"), sdomain)
1293            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1294
1295            if self.fedkit:
1296                remote_script_dir = "/usr/local/federation/bin"
1297                local_script_dir = "/usr/local/federation/bin"
1298            else:
1299                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1300                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1301
1302            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1303            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1304            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1305
1306            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1307            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1308
1309            # translate to lower case so the `hostname` hack for specifying
1310            # configuration files works.
1311            conf_file = conf_file.lower();
1312            remote_conf_file = remote_conf_file.lower();
1313
1314            if dtb == master:
1315                active = "false"
1316            elif gw == master:
1317                active = "true"
1318            elif active_end.has_key('%s-%s' % (dtb, gw)):
1319                active = "false"
1320            else:
1321                active_end['%s-%s' % (gw, dtb)] = 1
1322                active = "true"
1323
1324            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1325            print >>gwconfig, "Active: %s" % active
1326            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1327            if tunnel_iface:
1328                print >>gwconfig, "Interface: %s" % tunnel_iface
1329            print >>gwconfig, "BossName: %s" % boss
1330            print >>gwconfig, "FsName: %s" % fs
1331            print >>gwconfig, "EventServerName: %s" % event_server
1332            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1333            print >>gwconfig, "SeerControl: %s" % seer_control
1334            print >>gwconfig, "Type: %s" % type
1335            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1336            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1337                    local_script_dir
1338            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1339            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1340            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1341                    (remote_conf_dir, remote_conf_file)
1342            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1343            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1344            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1345            gwconfig.close()
1346
1347            return active == "true"
1348
1349        def __call__(self, line, allocated, tbparams):
1350            # Process gateways
1351            if not self.current_gateways:
1352                m = self.begin_gateways.match(line)
1353                if m:
1354                    self.current_gateways = m.group(1)
1355                    if allocated.has_key(self.current_gateways):
1356                        # This test should always succeed
1357                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1358                        if not os.path.exists(tb_dir):
1359                            try:
1360                                os.mkdir(tb_dir)
1361                            except IOError:
1362                                raise service_error(service_error.internal,
1363                                        "Cannot create %s" % tb_dir)
1364                    else:
1365                        # XXX
1366                        self.log.error("[gateways]: Ignoring gateways for " + \
1367                                "unknown testbed %s" % self.current_gateways)
1368                        self.current_gateways = None
1369                    return True
1370                else:
1371                    return False
1372            else:
1373                m = self.end_gateways.match(line)
1374                if m :
1375                    if m.group(1) != self.current_gateways:
1376                        raise service_error(service_error.internal,
1377                                "Mismatched gateway markers!?")
1378                    if self.control_gateway:
1379                        try:
1380                            cc = open("%s/%s/client.conf" %
1381                                    (self.tmpdir, self.current_gateways), 'w')
1382                            print >>cc, "ControlGateway: %s" % \
1383                                    self.control_gateway
1384                            if tbparams[self.master].has_key('smbshare'):
1385                                print >>cc, "SMBSHare: %s" % \
1386                                        tbparams[self.master]['smbshare']
1387                            print >>cc, "ProjectUser: %s" % \
1388                                    tbparams[self.master]['user']
1389                            print >>cc, "ProjectName: %s" % \
1390                                    tbparams[self.master]['project']
1391                            print >>cc, "ExperimentID: %s/%s" % \
1392                                    ( tbparams[self.master]['project'], \
1393                                    self.eid )
1394                            cc.close()
1395                        except IOError:
1396                            raise service_error(service_error.internal,
1397                                    "Error creating client config")
1398                        # XXX: This seer specific file should disappear
1399                        try:
1400                            cc = open("%s/%s/seer.conf" %
1401                                    (self.tmpdir, self.current_gateways),
1402                                    'w')
1403                            if self.current_gateways != self.master:
1404                                print >>cc, "ControlNode: %s" % \
1405                                        self.control_gateway
1406                            print >>cc, "ExperimentID: %s/%s" % \
1407                                    ( tbparams[self.master]['project'], \
1408                                    self.eid )
1409                            cc.close()
1410                        except IOError:
1411                            raise service_error(service_error.internal,
1412                                    "Error creating seer config")
1413                    else:
1414                        debug.error("[gateways]: No control gateway for %s" %\
1415                                    self.current_gateways)
1416                    self.current_gateways = None
1417                else:
1418                    dtb, myname, desthost, type = line.split(" ")
1419
1420                    if type == "control" or type == "both":
1421                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1422                                self.eid, 
1423                                tbparams[self.current_gateways]['project'],
1424                                tbparams[self.current_gateways]['domain'])
1425                    try:
1426                        active = self.gateway_conf_file(self.current_gateways,
1427                                self.master, self.eid, self.gw_pubkey_base,
1428                                self.gw_secretkey_base,
1429                                self.active_end, tbparams, dtb, myname,
1430                                desthost, type)
1431                    except IOError, e:
1432                        raise service_error(service_error.internal,
1433                                "Failed to write config file for %s" % \
1434                                        self.current_gateway)
1435           
1436                    gw_pubkey = "%s/keys/%s" % \
1437                            (self.tmpdir, self.gw_pubkey_base)
1438                    gw_secretkey = "%s/keys/%s" % \
1439                            (self.tmpdir, self.gw_secretkey_base)
1440
1441                    pkfile = "%s/%s/%s" % \
1442                            ( self.tmpdir, self.current_gateways, 
1443                                    self.gw_pubkey_base)
1444                    skfile = "%s/%s/%s" % \
1445                            ( self.tmpdir, self.current_gateways, 
1446                                    self.gw_secretkey_base)
1447
1448                    if not os.path.exists(pkfile):
1449                        try:
1450                            self.copy_file(gw_pubkey, pkfile)
1451                        except IOError:
1452                            service_error(service_error.internal,
1453                                    "Failed to copy pubkey file")
1454
1455                    if active and not os.path.exists(skfile):
1456                        try:
1457                            self.copy_file(gw_secretkey, skfile)
1458                        except IOError:
1459                            service_error(service_error.internal,
1460                                    "Failed to copy secretkey file")
1461                return True
1462
1463    class shunt_to_file:
1464        """
1465        Simple class to write data between two regexps to a file.
1466        """
1467        def __init__(self, begin, end, filename):
1468            """
1469            Begin shunting on a match of begin, stop on end, send data to
1470            filename.
1471            """
1472            self.begin = re.compile(begin)
1473            self.end = re.compile(end)
1474            self.in_shunt = False
1475            self.file = None
1476            self.filename = filename
1477
1478        def __call__(self, line):
1479            """
1480            Call this on each line in the input that may be shunted.
1481            """
1482            if not self.in_shunt:
1483                if self.begin.match(line):
1484                    self.in_shunt = True
1485                    try:
1486                        self.file = open(self.filename, "w")
1487                    except:
1488                        self.file = None
1489                        raise
1490                    return True
1491                else:
1492                    return False
1493            else:
1494                if self.end.match(line):
1495                    if self.file: 
1496                        self.file.close()
1497                        self.file = None
1498                    self.in_shunt = False
1499                else:
1500                    if self.file:
1501                        print >>self.file, line
1502                return True
1503
1504    class shunt_to_list:
1505        """
1506        Same interface as shunt_to_file.  Data collected in self.list, one list
1507        element per line.
1508        """
1509        def __init__(self, begin, end):
1510            self.begin = re.compile(begin)
1511            self.end = re.compile(end)
1512            self.in_shunt = False
1513            self.list = [ ]
1514       
1515        def __call__(self, line):
1516            if not self.in_shunt:
1517                if self.begin.match(line):
1518                    self.in_shunt = True
1519                    return True
1520                else:
1521                    return False
1522            else:
1523                if self.end.match(line):
1524                    self.in_shunt = False
1525                else:
1526                    self.list.append(line)
1527                return True
1528
1529    class shunt_to_string:
1530        """
1531        Same interface as shunt_to_file.  Data collected in self.str, all in
1532        one string.
1533        """
1534        def __init__(self, begin, end):
1535            self.begin = re.compile(begin)
1536            self.end = re.compile(end)
1537            self.in_shunt = False
1538            self.str = ""
1539       
1540        def __call__(self, line):
1541            if not self.in_shunt:
1542                if self.begin.match(line):
1543                    self.in_shunt = True
1544                    return True
1545                else:
1546                    return False
1547            else:
1548                if self.end.match(line):
1549                    self.in_shunt = False
1550                else:
1551                    self.str += line
1552                return True
1553
1554    def create_experiment(self, req, fid):
1555        """
1556        The external interface to experiment creation called from the
1557        dispatcher.
1558
1559        Creates a working directory, splits the incoming description using the
1560        splitter script and parses out the avrious subsections using the
1561        lcasses above.  Once each sub-experiment is created, use pooled threads
1562        to instantiate them and start it all up.
1563        """
1564
1565        if not self.auth.check_attribute(fid, 'create'):
1566            raise service_error(service_error.access, "Create access denied")
1567
1568        try:
1569            tmpdir = tempfile.mkdtemp(prefix="split-")
1570        except IOError:
1571            raise service_error(service_error.internal, "Cannot create tmp dir")
1572
1573        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1574        gw_secretkey_base = "fed.%s" % self.ssh_type
1575        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1576        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1577        tclfile = tmpdir + "/experiment.tcl"
1578        tbparams = { }
1579        try:
1580            access_user = self.accessdb[fid]
1581        except KeyError:
1582            raise service_error(service_error.internal,
1583                    "Access map and authorizer out of sync in " + \
1584                            "create_experiment for fedid %s"  % fid)
1585
1586        pid = "dummy"
1587        gid = "dummy"
1588        # XXX
1589        fail_soft = False
1590
1591        try:
1592            os.mkdir(tmpdir+"/keys")
1593        except OSError:
1594            raise service_error(service_error.internal,
1595                    "Can't make temporary dir")
1596
1597        req = req.get('CreateRequestBody', None)
1598        if not req:
1599            raise service_error(service_error.req,
1600                    "Bad request format (no CreateRequestBody)")
1601        # The tcl parser needs to read a file so put the content into that file
1602        descr=req.get('experimentdescription', None)
1603        if descr:
1604            file_content=descr.get('ns2description', None)
1605            if file_content:
1606                try:
1607                    f = open(tclfile, 'w')
1608                    f.write(file_content)
1609                    f.close()
1610                except IOError:
1611                    raise service_error(service_error.internal,
1612                            "Cannot write temp experiment description")
1613            else:
1614                raise service_error(service_error.req, 
1615                        "Only ns2descriptions supported")
1616        else:
1617            raise service_error(service_error.req, "No experiment description")
1618
1619        if req.has_key('experimentID') and \
1620                req['experimentID'].has_key('localname'):
1621            eid = req['experimentID']['localname']
1622            self.state_lock.acquire()
1623            while (self.state.has_key(eid)):
1624                eid += random.choice(string.ascii_letters)
1625            # To avoid another thread picking this localname
1626            self.state[eid] = "placeholder"
1627            self.state_lock.release()
1628        else:
1629            eid = self.exp_stem
1630            for i in range(0,5):
1631                eid += random.choice(string.ascii_letters)
1632            self.state_lock.acquire()
1633            while (self.state.has_key(eid)):
1634                eid = self.exp_stem
1635                for i in range(0,5):
1636                    eid += random.choice(string.ascii_letters)
1637            # To avoid another thread picking this localname
1638            self.state[eid] = "placeholder"
1639            self.state_lock.release()
1640
1641        try: 
1642            # This catches exceptions to clear the placeholder if necessary
1643            try:
1644                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1645            except ValueError:
1646                raise service_error(service_error.server_config, 
1647                        "Bad key type (%s)" % self.ssh_type)
1648
1649            user = req.get('user', None)
1650            if user == None:
1651                raise service_error(service_error.req, "No user")
1652
1653            master = req.get('master', None)
1654            if not master:
1655                raise service_error(service_error.req,
1656                        "No master testbed label")
1657            export_project = req.get('exportProject', None)
1658            if not export_project:
1659                raise service_error(service_error.req, "No export project")
1660           
1661            if self.splitter_url:
1662                self.log.debug("Calling remote splitter at %s" % \
1663                        self.splitter_url)
1664                split_data = self.remote_splitter(self.splitter_url,
1665                        file_content, master)
1666            else:
1667                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1668                    str(self.muxmax), '-m', master]
1669
1670                if self.fedkit:
1671                    tclcmd.append('-k')
1672
1673                if self.gatewaykit:
1674                    tclcmd.append('-K')
1675
1676                tclcmd.extend([pid, gid, eid, tclfile])
1677
1678                self.log.debug("running local splitter %s", " ".join(tclcmd))
1679                tclparser = Popen(tclcmd, stdout=PIPE)
1680                split_data = tclparser.stdout
1681
1682            allocated = { }         # Testbeds we can access
1683            started = { }           # Testbeds where a sub-experiment started
1684                                # successfully
1685
1686            # Objects to parse the splitter output (defined above)
1687            parse_current_testbed = self.current_testbed(eid, tmpdir,
1688                    self.fedkit, self.gatewaykit)
1689            parse_allbeds = self.allbeds(self.get_access)
1690            parse_gateways = self.gateways(eid, master, tmpdir,
1691                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1692                    self.fedkit)
1693            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1694                        "^#\s+End\s+Vtopo")
1695            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1696                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1697            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1698                    "^#\s+End\s+tarfiles")
1699            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1700                    "^#\s+End\s+rpms")
1701
1702            # Working on the split data
1703            for line in split_data:
1704                line = line.rstrip()
1705                if parse_current_testbed(line, master, allocated, tbparams):
1706                    continue
1707                elif parse_allbeds(line, user, tbparams, master, export_project,
1708                        access_user):
1709                    continue
1710                elif parse_gateways(line, allocated, tbparams):
1711                    continue
1712                elif parse_vtopo(line):
1713                    continue
1714                elif parse_hostnames(line):
1715                    continue
1716                elif parse_tarfiles(line):
1717                    continue
1718                elif parse_rpms(line):
1719                    continue
1720                else:
1721                    raise service_error(service_error.internal, 
1722                            "Bad tcl parse? %s" % line)
1723            # Virtual topology and visualization
1724            vtopo = self.gentopo(parse_vtopo.str)
1725            if not vtopo:
1726                raise service_error(service_error.internal, 
1727                        "Failed to generate virtual topology")
1728
1729            vis = self.genviz(vtopo)
1730            if not vis:
1731                raise service_error(service_error.internal, 
1732                        "Failed to generate visualization")
1733           
1734            # save federant information
1735            for k in allocated.keys():
1736                tbparams[k]['federant'] = {\
1737                        'name': [ { 'localname' : eid} ],\
1738                        'emulab': tbparams[k]['emulab'],\
1739                        'allocID' : tbparams[k]['allocID'],\
1740                        'master' : k == master,\
1741                    }
1742
1743
1744            # Copy tarfiles and rpms needed at remote sites into a staging area
1745            try:
1746                if self.fedkit:
1747                    for t in self.fedkit:
1748                        parse_tarfiles.list.append(t[1])
1749                if self.gatewaykit:
1750                    for t in self.gatewaykit:
1751                        parse_tarfiles.list.append(t[1])
1752                for t in parse_tarfiles.list:
1753                    if not os.path.exists("%s/tarfiles" % tmpdir):
1754                        os.mkdir("%s/tarfiles" % tmpdir)
1755                    self.copy_file(t, "%s/tarfiles/%s" % \
1756                            (tmpdir, os.path.basename(t)))
1757                for r in parse_rpms.list:
1758                    if not os.path.exists("%s/rpms" % tmpdir):
1759                        os.mkdir("%s/rpms" % tmpdir)
1760                    self.copy_file(r, "%s/rpms/%s" % \
1761                            (tmpdir, os.path.basename(r)))
1762            except IOError, e:
1763                raise service_error(service_error.internal, 
1764                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1765
1766        except service_error, e:
1767            # If something goes wrong in the parse (usually an access error)
1768            # clear the placeholder state.  From here on out the code delays
1769            # exceptions.
1770            self.state_lock.acquire()
1771            del self.state[eid]
1772            self.state_lock.release()
1773            raise e
1774
1775        thread_pool = self.thread_pool(self.nthreads)
1776        threads = [ ]
1777
1778        for tb in [ k for k in allocated.keys() if k != master]:
1779            # Create and start a thread to start the segment, and save it to
1780            # get the return value later
1781            thread_pool.wait_for_slot()
1782            t  = self.pooled_thread(target=self.start_segment, 
1783                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1784                    pdata=thread_pool, trace_file=self.trace_file)
1785            threads.append(t)
1786            t.start()
1787
1788        # Wait until all finish
1789        thread_pool.wait_for_all_done()
1790
1791        # If none failed, start the master
1792        failed = [ t.getName() for t in threads if not t.rv ]
1793
1794        if len(failed) == 0:
1795            if not self.start_segment(master, eid, tbparams, tmpdir):
1796                failed.append(master)
1797
1798        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1799        # If one failed clean up, unless fail_soft is set
1800        if failed:
1801            if not fail_soft:
1802                thread_pool.clear()
1803                for tb in succeeded:
1804                    # Create and start a thread to stop the segment
1805                    thread_pool.wait_for_slot()
1806                    t  = self.pooled_thread(target=self.stop_segment, 
1807                            args=(tb, eid, tbparams), name=tb,
1808                            pdata=thread_pool, trace_file=self.trace_file)
1809                    t.start()
1810                # Wait until all finish
1811                thread_pool.wait_for_all_done()
1812
1813                # release the allocations
1814                for tb in tbparams.keys():
1815                    self.release_access(tb, tbparams[tb]['allocID'])
1816                # Remove the placeholder
1817                self.state_lock.acquire()
1818                del self.state[eid]
1819                self.state_lock.release()
1820
1821                raise service_error(service_error.federant,
1822                    "Swap in failed on %s" % ",".join(failed))
1823        else:
1824            self.log.info("[start_segment]: Experiment %s started" % eid)
1825
1826        # Generate an ID for the experiment (slice) and a certificate that the
1827        # allocator can use to prove they own it.  We'll ship it back through
1828        # the encrypted connection.
1829        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1830
1831        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1832
1833        # Walk up tmpdir, deleting as we go
1834        for path, dirs, files in os.walk(tmpdir, topdown=False):
1835            for f in files:
1836                os.remove(os.path.join(path, f))
1837            for d in dirs:
1838                os.rmdir(os.path.join(path, d))
1839        os.rmdir(tmpdir)
1840
1841        # The deepcopy prevents the allocation ID and other binaries from being
1842        # translated into other formats
1843        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1844                for tb in tbparams.keys() \
1845                    if tbparams[tb].has_key('federant') ],\
1846                    'vtopo': vtopo,\
1847                    'vis' : vis,
1848                    'experimentID' : [\
1849                            { 'fedid': copy.copy(expid) }, \
1850                            { 'localname': eid },\
1851                        ],\
1852                    'experimentAccess': { 'X509' : expcert },\
1853                }
1854        # remove the allocationID info from each federant
1855        for f in resp['federant']:
1856            if f.has_key('allocID'): del f['allocID']
1857
1858        # Insert the experiment into our state and update the disk copy
1859        self.state_lock.acquire()
1860        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1861                for tb in tbparams.keys() \
1862                    if tbparams[tb].has_key('federant') ],\
1863                    'vtopo': vtopo,\
1864                    'vis' : vis,
1865                    'owner': fid,
1866                    'experimentID' : [\
1867                            { 'fedid': expid }, { 'localname': eid },\
1868                        ],\
1869                }
1870        self.state[eid] = self.state[expid]
1871        if self.state_filename: self.write_state()
1872        self.state_lock.release()
1873
1874        self.auth.set_attribute(fid, expid)
1875        self.auth.set_attribute(expid, expid)
1876
1877        if not failed:
1878            return resp
1879        else:
1880            raise service_error(service_error.partial, \
1881                    "Partial swap in on %s" % ",".join(succeeded))
1882
1883    def check_experiment_access(self, fid, key):
1884        """
1885        Confirm that the fid has access to the experiment.  Though a request
1886        may be made in terms of a local name, the access attribute is always
1887        the experiment's fedid.
1888        """
1889        if not isinstance(key, fedid):
1890            self.state_lock.acquire()
1891            if self.state.has_key(key):
1892                if isinstance(self.state[key], dict):
1893                    try:
1894                        kl = [ f['fedid'] for f in \
1895                                self.state[key]['experimentID']\
1896                                    if f.has_key('fedid') ]
1897                    except KeyError:
1898                        self.state_lock.release()
1899                        raise service_error(service_error.internal, 
1900                                "No fedid for experiment %s when checking " +\
1901                                        "access(!?)" % key)
1902                    if len(kl) == 1:
1903                        key = kl[0]
1904                    else:
1905                        self.state_lock.release()
1906                        raise service_error(service_error.internal, 
1907                                "multiple fedids for experiment %s when " +\
1908                                        "checking access(!?)" % key)
1909                elif isinstance(self.state[key], str):
1910                    self.state_lock.release()
1911                    raise service_error(service_error.internal, 
1912                            ("experiment %s is placeholder.  " +\
1913                                    "Creation in progress or aborted oddly") \
1914                                    % key)
1915                else:
1916                    self.state_lock.release()
1917                    raise service_error(service_error.internal, 
1918                            "Unexpected state for %s" % key)
1919
1920            else:
1921                self.state_lock.release()
1922                raise service_error(service_error.access, "Access Denied")
1923            self.state_lock.release()
1924
1925        if self.auth.check_attribute(fid, key):
1926            return True
1927        else:
1928            raise service_error(service_error.access, "Access Denied")
1929
1930
1931
1932    def get_vtopo(self, req, fid):
1933        """
1934        Return the stored virtual topology for this experiment
1935        """
1936        rv = None
1937
1938        req = req.get('VtopoRequestBody', None)
1939        if not req:
1940            raise service_error(service_error.req,
1941                    "Bad request format (no VtopoRequestBody)")
1942        exp = req.get('experiment', None)
1943        if exp:
1944            if exp.has_key('fedid'):
1945                key = exp['fedid']
1946                keytype = "fedid"
1947            elif exp.has_key('localname'):
1948                key = exp['localname']
1949                keytype = "localname"
1950            else:
1951                raise service_error(service_error.req, "Unknown lookup type")
1952        else:
1953            raise service_error(service_error.req, "No request?")
1954
1955        self.check_experiment_access(fid, key)
1956
1957        self.state_lock.acquire()
1958        if self.state.has_key(key):
1959            rv = { 'experiment' : {keytype: key },\
1960                    'vtopo': self.state[key]['vtopo'],\
1961                }
1962        self.state_lock.release()
1963
1964        if rv: return rv
1965        else: raise service_error(service_error.req, "No such experiment")
1966
1967    def get_vis(self, req, fid):
1968        """
1969        Return the stored visualization for this experiment
1970        """
1971        rv = None
1972
1973        req = req.get('VisRequestBody', None)
1974        if not req:
1975            raise service_error(service_error.req,
1976                    "Bad request format (no VisRequestBody)")
1977        exp = req.get('experiment', None)
1978        if exp:
1979            if exp.has_key('fedid'):
1980                key = exp['fedid']
1981                keytype = "fedid"
1982            elif exp.has_key('localname'):
1983                key = exp['localname']
1984                keytype = "localname"
1985            else:
1986                raise service_error(service_error.req, "Unknown lookup type")
1987        else:
1988            raise service_error(service_error.req, "No request?")
1989
1990        self.check_experiment_access(fid, key)
1991
1992        self.state_lock.acquire()
1993        if self.state.has_key(key):
1994            rv =  { 'experiment' : {keytype: key },\
1995                    'vis': self.state[key]['vis'],\
1996                    }
1997        self.state_lock.release()
1998
1999        if rv: return rv
2000        else: raise service_error(service_error.req, "No such experiment")
2001
2002    def get_info(self, req, fid):
2003        """
2004        Return all the stored info about this experiment
2005        """
2006        rv = None
2007
2008        req = req.get('InfoRequestBody', None)
2009        if not req:
2010            raise service_error(service_error.req,
2011                    "Bad request format (no VisRequestBody)")
2012        exp = req.get('experiment', None)
2013        if exp:
2014            if exp.has_key('fedid'):
2015                key = exp['fedid']
2016                keytype = "fedid"
2017            elif exp.has_key('localname'):
2018                key = exp['localname']
2019                keytype = "localname"
2020            else:
2021                raise service_error(service_error.req, "Unknown lookup type")
2022        else:
2023            raise service_error(service_error.req, "No request?")
2024
2025        self.check_experiment_access(fid, key)
2026
2027        # The state may be massaged by the service function that called
2028        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2029        # state.
2030        self.state_lock.acquire()
2031        if self.state.has_key(key):
2032            rv = copy.deepcopy(self.state[key])
2033        self.state_lock.release()
2034        # Remove the owner info
2035        del rv['owner']
2036        # remove the allocationID info from each federant
2037        for f in rv['federant']:
2038            if f.has_key('allocID'): del f['allocID']
2039
2040        if rv: return rv
2041        else: raise service_error(service_error.req, "No such experiment")
2042
2043
2044    def terminate_experiment(self, req, fid):
2045        """
2046        Swap this experiment out on the federants and delete the shared
2047        information
2048        """
2049        tbparams = { }
2050        req = req.get('TerminateRequestBody', None)
2051        if not req:
2052            raise service_error(service_error.req,
2053                    "Bad request format (no TerminateRequestBody)")
2054        exp = req.get('experiment', None)
2055        if exp:
2056            if exp.has_key('fedid'):
2057                key = exp['fedid']
2058                keytype = "fedid"
2059            elif exp.has_key('localname'):
2060                key = exp['localname']
2061                keytype = "localname"
2062            else:
2063                raise service_error(service_error.req, "Unknown lookup type")
2064        else:
2065            raise service_error(service_error.req, "No request?")
2066
2067        self.check_experiment_access(fid, key)
2068
2069        self.state_lock.acquire()
2070        fed_exp = self.state.get(key, None)
2071
2072        if fed_exp:
2073            # This branch of the conditional holds the lock to generate a
2074            # consistent temporary tbparams variable to deallocate experiments.
2075            # It releases the lock to do the deallocations and reacquires it to
2076            # remove the experiment state when the termination is complete.
2077            ids = []
2078            #  experimentID is a list of dicts that are self-describing
2079            #  identifiers.  This finds all the fedids and localnames - the
2080            #  keys of self.state - and puts them into ids.
2081            for id in fed_exp.get('experimentID', []):
2082                if id.has_key('fedid'): ids.append(id['fedid'])
2083                if id.has_key('localname'): ids.append(id['localname'])
2084
2085            # Construct enough of the tbparams to make the stop_segment calls
2086            # work
2087            for fed in fed_exp['federant']:
2088                try:
2089                    for e in fed['name']:
2090                        eid = e.get('localname', None)
2091                        if eid: break
2092                    else:
2093                        continue
2094
2095                    p = fed['emulab']['project']
2096
2097                    project = p['name']['localname']
2098                    tb = p['testbed']['localname']
2099                    user = p['user'][0]['userID']['localname']
2100
2101                    domain = fed['emulab']['domain']
2102                    host  = fed['emulab']['ops']
2103                    aid = fed['allocID']
2104                except KeyError, e:
2105                    continue
2106                tbparams[tb] = {\
2107                        'user': user,\
2108                        'domain': domain,\
2109                        'project': project,\
2110                        'host': host,\
2111                        'eid': eid,\
2112                        'aid': aid,\
2113                    }
2114            self.state_lock.release()
2115
2116            # Stop everyone.
2117            thread_pool = self.thread_pool(self.nthreads)
2118            for tb in tbparams.keys():
2119                # Create and start a thread to stop the segment
2120                thread_pool.wait_for_slot()
2121                t  = self.pooled_thread(target=self.stop_segment, 
2122                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2123                        pdata=thread_pool, trace_file=self.trace_file)
2124                t.start()
2125            # Wait for completions
2126            thread_pool.wait_for_all_done()
2127
2128            # release the allocations
2129            for tb in tbparams.keys():
2130                self.release_access(tb, tbparams[tb]['aid'])
2131
2132            # Remove the terminated experiment
2133            self.state_lock.acquire()
2134            for id in ids:
2135                if self.state.has_key(id): del self.state[id]
2136
2137            if self.state_filename: self.write_state()
2138            self.state_lock.release()
2139
2140            return { 'experiment': exp }
2141        else:
2142            # Don't forget to release the lock
2143            self.state_lock.release()
2144            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.