source: fedd/federation/experiment_control.py @ b8a9fb7

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since b8a9fb7 was b8a9fb7, checked in by Ted Faber <faber@…>, 15 years ago

Make the fedkit and gatewaykit configuration parameters into lists of destination directories and tarfiles rather than a single tarfile.

  • Property mode set to 100644
File size: 64.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self, nthreads):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53            self.nthreads = nthreads
54
55        def acquire(self):
56            """
57            Get the pool's lock.
58            """
59            self.changed.acquire()
60
61        def release(self):
62            """
63            Release the pool's lock.
64            """
65            self.changed.release()
66
67        def wait(self, timeout = None):
68            """
69            Wait for a pool thread to start or stop.
70            """
71            self.changed.wait(timeout)
72
73        def start(self):
74            """
75            Called by a pool thread to report starting.
76            """
77            self.changed.acquire()
78            self.started += 1
79            self.changed.notifyAll()
80            self.changed.release()
81
82        def terminate(self):
83            """
84            Called by a pool thread to report finishing.
85            """
86            self.changed.acquire()
87            self.terminated += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def clear(self):
92            """
93            Clear all pool data.
94            """
95            self.changed.acquire()
96            self.started = 0
97            self.terminated =0
98            self.changed.notifyAll()
99            self.changed.release()
100
101        def wait_for_slot(self):
102            """
103            Wait until we have a free slot to start another pooled thread
104            """
105            self.acquire()
106            while self.started - self.terminated >= self.nthreads:
107                self.wait()
108            self.release()
109
110        def wait_for_all_done(self):
111            """
112            Wait until all active threads finish (and at least one has started)
113            """
114            self.acquire()
115            while self.started == 0 or self.started > self.terminated:
116                self.wait()
117            self.release()
118
119    class pooled_thread(Thread):
120        """
121        One of a set of threads dedicated to a specific task.  Uses the
122        thread_pool class above for coordination.
123        """
124        def __init__(self, group=None, target=None, name=None, args=(), 
125                kwargs={}, pdata=None, trace_file=None):
126            Thread.__init__(self, group, target, name, args, kwargs)
127            self.rv = None          # Return value of the ops in this thread
128            self.exception = None   # Exception that terminated this thread
129            self.target=target      # Target function to run on start()
130            self.args = args        # Args to pass to target
131            self.kwargs = kwargs    # Additional kw args
132            self.pdata = pdata      # thread_pool for this class
133            # Logger for this thread
134            self.log = logging.getLogger("fedd.experiment_control")
135       
136        def run(self):
137            """
138            Emulate Thread.run, except add pool data manipulation and error
139            logging.
140            """
141            if self.pdata:
142                self.pdata.start()
143
144            if self.target:
145                try:
146                    self.rv = self.target(*self.args, **self.kwargs)
147                except service_error, s:
148                    self.exception = s
149                    self.log.error("Thread exception: %s %s" % \
150                            (s.code_string(), s.desc))
151                except:
152                    self.exception = sys.exc_info()[1]
153                    self.log.error(("Unexpected thread exception: %s" +\
154                            "Trace %s") % (self.exception,\
155                                traceback.format_exc()))
156            if self.pdata:
157                self.pdata.terminate()
158
159    call_RequestAccess = service_caller('RequestAccess')
160    call_ReleaseAccess = service_caller('ReleaseAccess')
161    call_Ns2Split = service_caller('Ns2Split')
162
163    def __init__(self, config=None, auth=None):
164        """
165        Intialize the various attributes, most from the config object
166        """
167
168        def parse_tarfile_list(tf):
169            """
170            Parse a tarfile list from the configuration.  This is a set of
171            paths and tarfiles separated by spaces.
172            """
173            rv = [ ]
174            if tf is not None:
175                tl = tf.split()
176                while len(tl) > 1:
177                    p, t = tl[0:2]
178                    del tl[0:2]
179                    rv.append((p, t))
180            return rv
181
182        self.thread_with_rv = experiment_control_local.pooled_thread
183        self.thread_pool = experiment_control_local.thread_pool
184
185        self.cert_file = config.get("experiment_control", "cert_file")
186        if self.cert_file:
187            self.cert_pwd = config.get("experiment_control", "cert_pwd")
188        else:
189            self.cert_file = config.get("globals", "cert_file")
190            self.cert_pwd = config.get("globals", "cert_pwd")
191
192        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
193                or config.get("globals", "trusted_certs")
194
195        self.exp_stem = "fed-stem"
196        self.log = logging.getLogger("fedd.experiment_control")
197        set_log_level(config, "experiment_control", self.log)
198        self.muxmax = 2
199        self.nthreads = 2
200        self.randomize_experiments = False
201
202        self.scp_exec = "/usr/bin/scp"
203        self.splitter = None
204        self.ssh_exec="/usr/bin/ssh"
205        self.ssh_keygen = "/usr/bin/ssh-keygen"
206        self.ssh_identity_file = None
207
208
209        self.debug = config.getboolean("experiment_control", "create_debug")
210        self.state_filename = config.get("experiment_control", 
211                "experiment_state")
212        self.splitter_url = config.get("experiment_control", "splitter_uri")
213        self.fedkit = parse_tarfile_list(\
214                config.get("experiment_control", "fedkit"))
215        self.gatewaykit = parse_tarfile_list(\
216                config.get("experiment_control", "gatewaykit"))
217        accessdb_file = config.get("experiment_control", "accessdb")
218
219        self.ssh_pubkey_file = config.get("experiment_control", 
220                "ssh_pubkey_file")
221        self.ssh_privkey_file = config.get("experiment_control",
222                "ssh_privkey_file")
223        # NB for internal master/slave ops, not experiment setup
224        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
225        self.state = { }
226        self.state_lock = Lock()
227        self.tclsh = "/usr/local/bin/otclsh"
228        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
229                config.get("experiment_control", "tcl_splitter",
230                        "/usr/testbed/lib/ns2ir/parse.tcl")
231        mapdb_file = config.get("experiment_control", "mapdb")
232        self.trace_file = sys.stderr
233
234        self.def_expstart = \
235                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
236                "/tmp/federate";
237        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
238                "FEDDIR/hosts";
239        self.def_gwstart = \
240                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
241                "/tmp/bridge.log";
242        self.def_mgwstart = \
243                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
244                "/tmp/bridge.log";
245        self.def_gwimage = "FBSD61-TUNNEL2";
246        self.def_gwtype = "pc";
247        self.local_access = { }
248
249        if auth:
250            self.auth = auth
251        else:
252            self.log.error(\
253                    "[access]: No authorizer initialized, creating local one.")
254            auth = authorizer()
255
256
257        if self.ssh_pubkey_file:
258            try:
259                f = open(self.ssh_pubkey_file, 'r')
260                self.ssh_pubkey = f.read()
261                f.close()
262            except IOError:
263                raise service_error(service_error.internal,
264                        "Cannot read sshpubkey")
265        else:
266            raise service_error(service_error.internal, 
267                    "No SSH public key file?")
268
269        if not self.ssh_privkey_file:
270            raise service_error(service_error.internal, 
271                    "No SSH public key file?")
272
273
274        if mapdb_file:
275            self.read_mapdb(mapdb_file)
276        else:
277            self.log.warn("[experiment_control] No testbed map, using defaults")
278            self.tbmap = { 
279                    'deter':'https://users.isi.deterlab.net:23235',
280                    'emulab':'https://users.isi.deterlab.net:23236',
281                    'ucb':'https://users.isi.deterlab.net:23237',
282                    }
283
284        if accessdb_file:
285                self.read_accessdb(accessdb_file)
286        else:
287            raise service_error(service_error.internal,
288                    "No accessdb specified in config")
289
290        # Grab saved state.  OK to do this w/o locking because it's read only
291        # and only one thread should be in existence that can see self.state at
292        # this point.
293        if self.state_filename:
294            self.read_state()
295
296        # Dispatch tables
297        self.soap_services = {\
298                'Create': soap_handler('Create', self.create_experiment),
299                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
300                'Vis': soap_handler('Vis', self.get_vis),
301                'Info': soap_handler('Info', self.get_info),
302                'Terminate': soap_handler('Terminate', 
303                    self.terminate_experiment),
304        }
305
306        self.xmlrpc_services = {\
307                'Create': xmlrpc_handler('Create', self.create_experiment),
308                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
309                'Vis': xmlrpc_handler('Vis', self.get_vis),
310                'Info': xmlrpc_handler('Info', self.get_info),
311                'Terminate': xmlrpc_handler('Terminate',
312                    self.terminate_experiment),
313        }
314
315    def copy_file(self, src, dest, size=1024):
316        """
317        Exceedingly simple file copy.
318        """
319        s = open(src,'r')
320        d = open(dest, 'w')
321
322        buf = "x"
323        while buf != "":
324            buf = s.read(size)
325            d.write(buf)
326        s.close()
327        d.close()
328
329    # Call while holding self.state_lock
330    def write_state(self):
331        """
332        Write a new copy of experiment state after copying the existing state
333        to a backup.
334
335        State format is a simple pickling of the state dictionary.
336        """
337        if os.access(self.state_filename, os.W_OK):
338            self.copy_file(self.state_filename, \
339                    "%s.bak" % self.state_filename)
340        try:
341            f = open(self.state_filename, 'w')
342            pickle.dump(self.state, f)
343        except IOError, e:
344            self.log.error("Can't write file %s: %s" % \
345                    (self.state_filename, e))
346        except pickle.PicklingError, e:
347            self.log.error("Pickling problem: %s" % e)
348        except TypeError, e:
349            self.log.error("Pickling problem (TypeError): %s" % e)
350
351    # Call while holding self.state_lock
352    def read_state(self):
353        """
354        Read a new copy of experiment state.  Old state is overwritten.
355
356        State format is a simple pickling of the state dictionary.
357        """
358        try:
359            f = open(self.state_filename, "r")
360            self.state = pickle.load(f)
361            self.log.debug("[read_state]: Read state from %s" % \
362                    self.state_filename)
363        except IOError, e:
364            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
365                    % (self.state_filename, e))
366        except pickle.UnpicklingError, e:
367            self.log.warning(("[read_state]: No saved state: " + \
368                    "Unpickling failed: %s") % e)
369       
370        for k in self.state.keys():
371            try:
372                # This list should only have one element in it, but phrasing it
373                # as a for loop doesn't cost much, really.  We have to find the
374                # fedid elements anyway.
375                for eid in [ f['fedid'] \
376                        for f in self.state[k]['experimentID']\
377                            if f.has_key('fedid') ]:
378                    self.auth.set_attribute(self.state[k]['owner'], eid)
379            except KeyError, e:
380                self.log.warning("[read_state]: State ownership or identity " +\
381                        "misformatted in %s: %s" % (self.state_filename, e))
382
383
384    def read_accessdb(self, accessdb_file):
385        """
386        Read the mapping from fedids that can create experiments to their name
387        in the 3-level access namespace.  All will be asserted from this
388        testbed and can include the local username and porject that will be
389        asserted on their behalf by this fedd.  Each fedid is also added to the
390        authorization system with the "create" attribute.
391        """
392        self.accessdb = {}
393        # These are the regexps for parsing the db
394        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
395        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
396                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
397        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
398                "\s*->\s*(" + name_expr + ")\s*$")
399        lineno = 0
400
401        # Parse the mappings and store in self.authdb, a dict of
402        # fedid -> (proj, user)
403        try:
404            f = open(accessdb_file, "r")
405            for line in f:
406                lineno += 1
407                line = line.strip()
408                if len(line) == 0 or line.startswith('#'):
409                    continue
410                m = project_line.match(line)
411                if m:
412                    fid = fedid(hexstr=m.group(1))
413                    project, user = m.group(2,3)
414                    if not self.accessdb.has_key(fid):
415                        self.accessdb[fid] = []
416                    self.accessdb[fid].append((project, user))
417                    continue
418
419                m = user_line.match(line)
420                if m:
421                    fid = fedid(hexstr=m.group(1))
422                    project = None
423                    user = m.group(2)
424                    if not self.accessdb.has_key(fid):
425                        self.accessdb[fid] = []
426                    self.accessdb[fid].append((project, user))
427                    continue
428                self.log.warn("[experiment_control] Error parsing access " +\
429                        "db %s at line %d" %  (accessdb_file, lineno))
430        except IOError:
431            raise service_error(service_error.internal,
432                    "Error opening/reading %s as experiment " +\
433                            "control accessdb" %  accessdb_file)
434        f.close()
435
436        # Initialize the authorization attributes
437        for fid in self.accessdb.keys():
438            self.auth.set_attribute(fid, 'create')
439
440    def read_mapdb(self, file):
441        """
442        Read a simple colon separated list of mappings for the
443        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
444        """
445
446        self.tbmap = { }
447        lineno =0
448        try:
449            f = open(file, "r")
450            for line in f:
451                lineno += 1
452                line = line.strip()
453                if line.startswith('#') or len(line) == 0:
454                    continue
455                try:
456                    label, url = line.split(':', 1)
457                    self.tbmap[label] = url
458                except ValueError, e:
459                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
460                            "map db: %s %s" % (lineno, line, e))
461        except IOError, e:
462            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
463                    "open %s: %s" % (file, e))
464        f.close()
465
466    def scp_file(self, file, user, host, dest=""):
467        """
468        scp a file to the remote host.  If debug is set the action is only
469        logged.
470        """
471
472        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
473                '-o', 'StrictHostKeyChecking yes', '-i', 
474                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
475        rv = 0
476
477        try:
478            dnull = open("/dev/null", "w")
479        except IOError:
480            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
481            dnull = Null
482
483        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
484        if not self.debug:
485            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
486
487        return rv == 0
488
489    def ssh_cmd(self, user, host, cmd, wname=None):
490        """
491        Run a remote command on host as user.  If debug is set, the action is
492        only logged.
493        """
494        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
495                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
496                (self.ssh_exec, self.ssh_privkey_file, 
497                        user, host, cmd)
498
499        try:
500            dnull = open("/dev/null", "r")
501        except IOError:
502            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
503            dnull = Null
504
505        self.log.debug("[ssh_cmd]: %s" % sh_str)
506        if not self.debug:
507            if dnull:
508                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
509            else:
510                sub = Popen(sh_str, shell=True)
511            return sub.wait() == 0
512        else:
513            return True
514
515    def ship_configs(self, host, user, src_dir, dest_dir):
516        """
517        Copy federant-specific configuration files to the federant.
518        """
519        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
520            return False
521        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
522            return False
523
524        for f in os.listdir(src_dir):
525            if os.path.isdir(f):
526                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
527                        "%s/%s" % (dest_dir, f)):
528                    return False
529            else:
530                if not self.scp_file("%s/%s" % (src_dir, f), 
531                        user, host, dest_dir):
532                    return False
533        return True
534
535    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
536        """
537        Start a sub-experiment on a federant.
538
539        Get the current state, modify or create as appropriate, ship data and
540        configs and start the experiment.  There are small ordering differences
541        based on the initial state of the sub-experiment.
542        """
543        # ops node in the federant
544        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
545        user = tbparams[tb]['user']     # federant user
546        pid = tbparams[tb]['project']   # federant project
547        # XXX
548        base_confs = ( "hosts",)
549        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
550        # command to test experiment state
551        expinfo_exec = "/usr/testbed/bin/expinfo" 
552        # Configuration directories on the remote machine
553        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
554        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
555        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
556        # Regular expressions to parse the expinfo response
557        state_re = re.compile("State:\s+(\w+)")
558        no_exp_re = re.compile("^No\s+such\s+experiment")
559        state = None    # Experiment state parsed from expinfo
560        # The expinfo ssh command.  Note the identity restriction to use only
561        # the identity provided in the pubkey given.
562        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
563                'StrictHostKeyChecking yes', '-i', 
564                self.ssh_privkey_file, "%s@%s" % (user, host), 
565                expinfo_exec, pid, eid]
566
567        # Get status
568        self.log.debug("[start_segment]: %s"% " ".join(cmd))
569        dev_null = None
570        try:
571            dev_null = open("/dev/null", "a")
572        except IOError, e:
573            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
574
575        if self.debug:
576            state = 'swapped'
577            rv = 0
578        else:
579            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
580            for line in status.stdout:
581                m = state_re.match(line)
582                if m: state = m.group(1)
583                else:
584                    m = no_exp_re.match(line)
585                    if m: state = "none"
586            rv = status.wait()
587
588        # If the experiment is not present the subcommand returns a non-zero
589        # return value.  If we successfully parsed a "none" outcome, ignore the
590        # return code.
591        if rv != 0 and state != "none":
592            raise service_error(service_error.internal,
593                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
594
595        self.log.debug("[start_segment]: %s: %s" % (tb, state))
596        self.log.info("[start_segment]:transferring experiment to %s" % tb)
597
598        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
599            return False
600        # Clear the federation config dirs
601        if not self.ssh_cmd(user, host, 
602                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
603            return False
604        # Clear and create the tarfiles and rpm directories
605        for d in (tarfiles_dir, rpms_dir):
606            if not self.ssh_cmd(user, host, 
607                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
608                return False
609            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
610                    "create tarfiles"):
611                return False
612       
613        if state == 'active':
614            # Create the federation config dirs (do not move outside the
615            # conditional.  Happens later in new expriment creation)
616            if not self.ssh_cmd(user, host, 
617                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
618                return False
619            # Remote experiment is active.  Modify it.
620            for f in base_confs:
621                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
622                        "%s/%s" % (proj_dir, f)):
623                    return False
624            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
625                    proj_dir):
626                return False
627            if os.path.isdir("%s/tarfiles" % tmpdir):
628                if not self.ship_configs(host, user,
629                        "%s/tarfiles" % tmpdir, tarfiles_dir):
630                    return False
631            if os.path.isdir("%s/rpms" % tmpdir):
632                if not self.ship_configs(host, user,
633                        "%s/rpms" % tmpdir, tarfiles_dir):
634                    return False
635            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
636            if not self.ssh_cmd(user, host,
637                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
638                            (pid, eid, tclfile), "modexp"):
639                return False
640            return True
641        elif state == "swapped":
642            # Create the federation config dirs (do not move outside the
643            # conditional.  Happens later in new expriment creation)
644            if not self.ssh_cmd(user, host, 
645                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
646                return False
647            # Remote experiment swapped out.  Modify it and swap it in.
648            for f in base_confs:
649                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
650                        "%s/%s" % (proj_dir, f)):
651                    return False
652            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
653                    proj_dir):
654                return False
655            if os.path.isdir("%s/tarfiles" % tmpdir):
656                if not self.ship_configs(host, user,
657                        "%s/tarfiles" % tmpdir, tarfiles_dir):
658                    return False
659            if os.path.isdir("%s/rpms" % tmpdir):
660                if not self.ship_configs(host, user,
661                        "%s/rpms" % tmpdir, tarfiles_dir):
662                    return False
663            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
664            if not self.ssh_cmd(user, host,
665                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
666                    "modexp"):
667                return False
668            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
669            if not self.ssh_cmd(user, host,
670                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
671                    "swapexp"):
672                return False
673            return True
674        elif state == "none":
675            # No remote experiment.  Create one.  We do this in 2 steps so we
676            # can put the configuration files and scripts into the new
677            # experiment directories.
678
679            # Tarfiles must be present for creation to work
680            if os.path.isdir("%s/tarfiles" % tmpdir):
681                if not self.ship_configs(host, user,
682                        "%s/tarfiles" % tmpdir, tarfiles_dir):
683                    return False
684            if os.path.isdir("%s/rpms" % tmpdir):
685                if not self.ship_configs(host, user,
686                        "%s/rpms" % tmpdir, tarfiles_dir):
687                    return False
688            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
689            if not self.ssh_cmd(user, host,
690                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
691                            (pid, eid, tclfile), "startexp"):
692                return False
693            # Create the federation config dirs (do not move outside the
694            # conditional.)
695            if not self.ssh_cmd(user, host, 
696                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
697                return False
698            # After startexp the per-experiment directories exist
699            for f in base_confs:
700                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
701                        "%s/%s" % (proj_dir, f)):
702                    return False
703            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
704                    proj_dir):
705                return False
706            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
707            if not self.ssh_cmd(user, host,
708                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
709                    "swapexp"):
710                return False
711            return True
712        else:
713            self.log.debug("[start_segment]:unknown state %s" % state)
714            return False
715
716    def stop_segment(self, tb, eid, tbparams):
717        """
718        Stop a sub experiment by calling swapexp on the federant
719        """
720        user = tbparams[tb]['user']
721        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
722        pid = tbparams[tb]['project']
723
724        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
725        return self.ssh_cmd(user, host,
726                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
727
728       
729    def generate_ssh_keys(self, dest, type="rsa" ):
730        """
731        Generate a set of keys for the gateways to use to talk.
732
733        Keys are of type type and are stored in the required dest file.
734        """
735        valid_types = ("rsa", "dsa")
736        t = type.lower();
737        if t not in valid_types: raise ValueError
738        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
739
740        try:
741            trace = open("/dev/null", "w")
742        except IOError:
743            raise service_error(service_error.internal,
744                    "Cannot open /dev/null??");
745
746        # May raise CalledProcessError
747        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
748        rv = call(cmd, stdout=trace, stderr=trace)
749        if rv != 0:
750            raise service_error(service_error.internal, 
751                    "Cannot generate nonce ssh keys.  %s return code %d" \
752                            % (self.ssh_keygen, rv))
753
754    def gentopo(self, str):
755        """
756        Generate the topology dtat structure from the splitter's XML
757        representation of it.
758
759        The topology XML looks like:
760            <experiment>
761                <nodes>
762                    <node><vname></vname><ips>ip1:ip2</ips></node>
763                </nodes>
764                <lans>
765                    <lan>
766                        <vname></vname><vnode></vnode><ip></ip>
767                        <bandwidth></bandwidth><member>node:port</member>
768                    </lan>
769                </lans>
770        """
771        class topo_parse:
772            """
773            Parse the topology XML and create the dats structure.
774            """
775            def __init__(self):
776                # Typing of the subelements for data conversion
777                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
778                self.int_subelements = ( 'bandwidth',)
779                self.float_subelements = ( 'delay',)
780                # The final data structure
781                self.nodes = [ ]
782                self.lans =  [ ]
783                self.topo = { \
784                        'node': self.nodes,\
785                        'lan' : self.lans,\
786                    }
787                self.element = { }  # Current element being created
788                self.chars = ""     # Last text seen
789
790            def end_element(self, name):
791                # After each sub element the contents is added to the current
792                # element or to the appropriate list.
793                if name == 'node':
794                    self.nodes.append(self.element)
795                    self.element = { }
796                elif name == 'lan':
797                    self.lans.append(self.element)
798                    self.element = { }
799                elif name in self.str_subelements:
800                    self.element[name] = self.chars
801                    self.chars = ""
802                elif name in self.int_subelements:
803                    self.element[name] = int(self.chars)
804                    self.chars = ""
805                elif name in self.float_subelements:
806                    self.element[name] = float(self.chars)
807                    self.chars = ""
808
809            def found_chars(self, data):
810                self.chars += data.rstrip()
811
812
813        tp = topo_parse();
814        parser = xml.parsers.expat.ParserCreate()
815        parser.EndElementHandler = tp.end_element
816        parser.CharacterDataHandler = tp.found_chars
817
818        parser.Parse(str)
819
820        return tp.topo
821       
822
823    def genviz(self, topo):
824        """
825        Generate the visualization the virtual topology
826        """
827
828        neato = "/usr/local/bin/neato"
829        # These are used to parse neato output and to create the visualization
830        # file.
831        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
832        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
833                "%s</type></node>"
834
835        try:
836            # Node names
837            nodes = [ n['vname'] for n in topo['node'] ]
838            topo_lans = topo['lan']
839        except KeyError:
840            raise service_error(service_error.internal, "Bad topology")
841
842        lans = { }
843        links = { }
844
845        # Walk through the virtual topology, organizing the connections into
846        # 2-node connections (links) and more-than-2-node connections (lans).
847        # When a lan is created, it's added to the list of nodes (there's a
848        # node in the visualization for the lan).
849        for l in topo_lans:
850            if links.has_key(l['vname']):
851                if len(links[l['vname']]) < 2:
852                    links[l['vname']].append(l['vnode'])
853                else:
854                    nodes.append(l['vname'])
855                    lans[l['vname']] = links[l['vname']]
856                    del links[l['vname']]
857                    lans[l['vname']].append(l['vnode'])
858            elif lans.has_key(l['vname']):
859                lans[l['vname']].append(l['vnode'])
860            else:
861                links[l['vname']] = [ l['vnode'] ]
862
863
864        # Open up a temporary file for dot to turn into a visualization
865        try:
866            df, dotname = tempfile.mkstemp()
867            dotfile = os.fdopen(df, 'w')
868        except IOError:
869            raise service_error(service_error.internal,
870                    "Failed to open file in genviz")
871
872        # Generate a dot/neato input file from the links, nodes and lans
873        try:
874            print >>dotfile, "graph G {"
875            for n in nodes:
876                print >>dotfile, '\t"%s"' % n
877            for l in links.keys():
878                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
879            for l in lans.keys():
880                for n in lans[l]:
881                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
882            print >>dotfile, "}"
883            dotfile.close()
884        except TypeError:
885            raise service_error(service_error.internal,
886                    "Single endpoint link in vtopo")
887        except IOError:
888            raise service_error(service_error.internal, "Cannot write dot file")
889
890        # Use dot to create a visualization
891        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
892                '-Gpack=true', dotname], stdout=PIPE)
893
894        # Translate dot to vis format
895        vis_nodes = [ ]
896        vis = { 'node': vis_nodes }
897        for line in dot.stdout:
898            m = vis_re.match(line)
899            if m:
900                vn = m.group(1)
901                vis_node = {'name': vn, \
902                        'x': float(m.group(2)),\
903                        'y' : float(m.group(3)),\
904                    }
905                if vn in links.keys() or vn in lans.keys():
906                    vis_node['type'] = 'lan'
907                else:
908                    vis_node['type'] = 'node'
909                vis_nodes.append(vis_node)
910        rv = dot.wait()
911
912        os.remove(dotname)
913        if rv == 0 : return vis
914        else: return None
915
916    def get_access(self, tb, nodes, user, tbparam, master, export_project,
917            access_user):
918        """
919        Get access to testbed through fedd and set the parameters for that tb
920        """
921
922        # XXX This is needless complexity.  It must vanish.
923        translate_attr = {
924            'slavenodestartcmd': 'expstart',
925            'slaveconnectorstartcmd': 'gwstart',
926            'masternodestartcmd': 'mexpstart',
927            'masterconnectorstartcmd': 'mgwstart',
928            'connectorimage': 'gwimage',
929            'connectortype': 'gwtype',
930            'tunnelcfg': 'tun',
931            'tunnelinterface': 'tunnelinterface',
932            'smbshare': 'smbshare',
933            'slaveconnectorcmd': 'gwcmd',
934            'slaveconnectorcmdparams': 'gwcmdparams',
935            'masterconnectorcmd': 'mgwcmd',
936            'masterconnectorcmdparams': 'mgwcmdparams',
937        }
938
939        uri = self.tbmap.get(tb, None)
940        if not uri:
941            raise service_error(serice_error.server_config, 
942                    "Unknown testbed: %s" % tb)
943
944        # currently this lumps all users into one service access group
945        service_keys = [ a for u in user \
946                for a in u.get('access', []) \
947                    if a.has_key('sshPubkey')]
948
949        if len(service_keys) == 0:
950            raise service_error(service_error.req, 
951                    "Must have at least one SSH pubkey for services")
952
953
954        for p, u in access_user:
955            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
956                    "to %s") %  ((p or "None"), u, uri))
957
958            if p:
959                # Request with user and project specified
960                req = {\
961                        'destinationTestbed' : { 'uri' : uri },
962                        'project': { 
963                            'name': {'localname': p},
964                            'user': [ {'userID': { 'localname': u } } ],
965                            },
966                        'user':  user,
967                        'allocID' : { 'localname': 'test' },
968                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
969                        'serviceAccess' : service_keys
970                    }
971            else:
972                # Request with only user specified
973                req = {\
974                        'destinationTestbed' : { 'uri' : uri },
975                        'user':  [ {'userID': { 'localname': u } } ],
976                        'allocID' : { 'localname': 'test' },
977                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
978                        'serviceAccess' : service_keys
979                    }
980
981            if tb == master:
982                # NB, the export_project parameter is a dict that includes
983                # the type
984                req['exportProject'] = export_project
985
986            # node resources if any
987            if nodes != None and len(nodes) > 0:
988                rnodes = [ ]
989                for n in nodes:
990                    rn = { }
991                    image, hw, count = n.split(":")
992                    if image: rn['image'] = [ image ]
993                    if hw: rn['hardware'] = [ hw ]
994                    if count and int(count) >0 : rn['count'] = int(count)
995                    rnodes.append(rn)
996                req['resources']= { }
997                req['resources']['node'] = rnodes
998
999            try:
1000                if self.local_access.has_key(uri):
1001                    # Local access call
1002                    req = { 'RequestAccessRequestBody' : req }
1003                    r = self.local_access[uri].RequestAccess(req, 
1004                            fedid(file=self.cert_file))
1005                    r = { 'RequestAccessResponseBody' : r }
1006                else:
1007                    r = self.call_RequestAccess(uri, req, 
1008                            self.cert_file, self.cert_pwd, self.trusted_certs)
1009            except service_error, e:
1010                if e.code == service_error.access:
1011                    self.log.debug("[get_access] Access denied")
1012                    r = None
1013                    continue
1014                else:
1015                    raise e
1016
1017            if r.has_key('RequestAccessResponseBody'):
1018                # Through to here we have a valid response, not a fault.
1019                # Access denied is a fault, so something better or worse than
1020                # access denied has happened.
1021                r = r['RequestAccessResponseBody']
1022                self.log.debug("[get_access] Access granted")
1023                break
1024            else:
1025                raise service_error(service_error.protocol,
1026                        "Bad proxy response")
1027       
1028        if not r:
1029            raise service_error(service_error.access, 
1030                    "Access denied by %s (%s)" % (tb, uri))
1031
1032        e = r['emulab']
1033        p = e['project']
1034        tbparam[tb] = { 
1035                "boss": e['boss'],
1036                "host": e['ops'],
1037                "domain": e['domain'],
1038                "fs": e['fileServer'],
1039                "eventserver": e['eventServer'],
1040                "project": unpack_id(p['name']),
1041                "emulab" : e,
1042                "allocID" : r['allocID'],
1043                }
1044        # Make the testbed name be the label the user applied
1045        p['testbed'] = {'localname': tb }
1046
1047        for u in p['user']:
1048            role = u.get('role', None)
1049            if role == 'experimentCreation':
1050                tbparam[tb]['user'] = unpack_id(u['userID'])
1051                break
1052        else:
1053            raise service_error(service_error.internal, 
1054                    "No createExperimentUser from %s" %tb)
1055
1056        for a in e['fedAttr']:
1057            if a['attribute']:
1058                key = translate_attr.get(a['attribute'].lower(), None)
1059                if key:
1060                    tbparam[tb][key]= a['value']
1061       
1062    def release_access(self, tb, aid):
1063        """
1064        Release access to testbed through fedd
1065        """
1066
1067        uri = self.tbmap.get(tb, None)
1068        if not uri:
1069            raise service_error(serice_error.server_config, 
1070                    "Unknown testbed: %s" % tb)
1071
1072        if self.local_access.has_key(uri):
1073            resp = self.local_access[uri].ReleaseAccess(\
1074                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1075                    fedid(file=self.cert_file))
1076            resp = { 'ReleaseAccessResponseBody': resp } 
1077        else:
1078            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1079                    self.cert_file, self.cert_pwd, self.trusted_certs)
1080
1081        # better error coding
1082
1083    def remote_splitter(self, uri, desc, master):
1084
1085        req = {
1086                'description' : { 'ns2description': desc },
1087                'master': master,
1088                'include_fedkit': bool(self.fedkit),
1089                'include_gatewaykit': bool(self.gatewaykit)
1090            }
1091
1092        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1093                self.trusted_certs)
1094
1095        if r.has_key('Ns2SplitResponseBody'):
1096            r = r['Ns2SplitResponseBody']
1097            if r.has_key('output'):
1098                return r['output'].splitlines()
1099            else:
1100                raise service_error(service_error.protocol, 
1101                        "Bad splitter response (no output)")
1102        else:
1103            raise service_error(service_error.protocol, "Bad splitter response")
1104       
1105    class current_testbed:
1106        """
1107        Object for collecting the current testbed description.  The testbed
1108        description is saved to a file with the local testbed variables
1109        subsittuted line by line.
1110        """
1111        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1112            def tar_list_to_string(tl):
1113                if tl is None: return None
1114
1115                rv = ""
1116                for t in tl:
1117                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1118                            (t[0], os.path.basename(t[1]))
1119                return rv
1120
1121
1122            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1123            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1124            self.current_testbed = None
1125            self.testbed_file = None
1126
1127            self.def_expstart = \
1128                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1129            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1130            self.def_gwstart = \
1131                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1132            self.def_mgwstart = \
1133                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1134            self.def_gwimage = "FBSD61-TUNNEL2";
1135            self.def_gwtype = "pc";
1136            self.def_mgwcmd = '# '
1137            self.def_mgwcmdparams = ''
1138            self.def_gwcmd = '# '
1139            self.def_gwcmdparams = ''
1140
1141            self.eid = eid
1142            self.tmpdir = tmpdir
1143            # Convert fedkit and gateway kit (which are lists of tuples) into a
1144            # substituition string.
1145            self.fedkit = tar_list_to_string(fedkit)
1146            self.gatewaykit = tar_list_to_string(gatewaykit)
1147
1148        def __call__(self, line, master, allocated, tbparams):
1149            # Capture testbed topology descriptions
1150            if self.current_testbed == None:
1151                m = self.begin_testbed.match(line)
1152                if m != None:
1153                    self.current_testbed = m.group(1)
1154                    if self.current_testbed == None:
1155                        raise service_error(service_error.req,
1156                                "Bad request format (unnamed testbed)")
1157                    allocated[self.current_testbed] = \
1158                            allocated.get(self.current_testbed,0) + 1
1159                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1160                    if not os.path.exists(tb_dir):
1161                        try:
1162                            os.mkdir(tb_dir)
1163                        except IOError:
1164                            raise service_error(service_error.internal,
1165                                    "Cannot create %s" % tb_dir)
1166                    try:
1167                        self.testbed_file = open("%s/%s.%s.tcl" %
1168                                (tb_dir, self.eid, self.current_testbed), 'w')
1169                    except IOError:
1170                        self.testbed_file = None
1171                    return True
1172                else: return False
1173            else:
1174                m = self.end_testbed.match(line)
1175                if m != None:
1176                    if m.group(1) != self.current_testbed:
1177                        raise service_error(service_error.internal, 
1178                                "Mismatched testbed markers!?")
1179                    if self.testbed_file != None: 
1180                        self.testbed_file.close()
1181                        self.testbed_file = None
1182                    self.current_testbed = None
1183                elif self.testbed_file:
1184                    # Substitute variables and put the line into the local
1185                    # testbed file.
1186                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1187                            self.def_gwtype)
1188                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1189                            self.def_gwimage)
1190                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1191                            self.def_mgwstart)
1192                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1193                            self.def_mexpstart)
1194                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1195                            self.def_gwstart)
1196                    expstart = tbparams[self.current_testbed].get('expstart', 
1197                            self.def_expstart)
1198                    project = tbparams[self.current_testbed].get('project')
1199                    gwcmd = tbparams[self.current_testbed].get('gwcmd', 
1200                            self.def_gwcmd)
1201                    gwcmdparams = tbparams[self.current_testbed].get(\
1202                            'gwcmdparams', self.def_gwcmdparams)
1203                    mgwcmd = tbparams[self.current_testbed].get('mgwcmd', 
1204                            self.def_gwcmd)
1205                    mgwcmdparams = tbparams[self.current_testbed].get(\
1206                            'mgwcmdparams', self.def_gwcmdparams)
1207                    line = re.sub("GWTYPE", gwtype, line)
1208                    line = re.sub("GWIMAGE", gwimage, line)
1209                    if self.current_testbed == master:
1210                        line = re.sub("GWSTART", mgwstart, line)
1211                        line = re.sub("EXPSTART", mexpstart, line)
1212                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1213                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1214                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1215                    else:
1216                        line = re.sub("GWSTART", gwstart, line)
1217                        line = re.sub("EXPSTART", expstart, line)
1218                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1219                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1220                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1221                    #These expansions contain EID and PROJDIR.  NB these are
1222                    # local fedkit and gatewaykit, which are strings.
1223                    if self.fedkit:
1224                        line = re.sub("FEDKIT", self.fedkit, line)
1225                    if self.gatewaykit:
1226                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1227                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1228                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1229                    line = re.sub("EID", self.eid, line)
1230                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1231                            (project, self.eid), line)
1232                    print >>self.testbed_file, line
1233                return True
1234
1235    class allbeds:
1236        """
1237        Process the Allbeds section.  Get access to each federant and save the
1238        parameters in tbparams
1239        """
1240        def __init__(self, get_access):
1241            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1242            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1243            self.in_allbeds = False
1244            self.get_access = get_access
1245
1246        def __call__(self, line, user, tbparams, master, export_project,
1247                access_user):
1248            # Testbed access parameters
1249            if not self.in_allbeds:
1250                if self.begin_allbeds.match(line):
1251                    self.in_allbeds = True
1252                    return True
1253                else:
1254                    return False
1255            else:
1256                if self.end_allbeds.match(line):
1257                    self.in_allbeds = False
1258                else:
1259                    nodes = line.split('|')
1260                    tb = nodes.pop(0)
1261                    self.get_access(tb, nodes, user, tbparams, master,
1262                            export_project, access_user)
1263                return True
1264
1265    class gateways:
1266        def __init__(self, eid, master, tmpdir, gw_pubkey,
1267                gw_secretkey, copy_file, fedkit):
1268            self.begin_gateways = \
1269                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1270            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1271            self.current_gateways = None
1272            self.control_gateway = None
1273            self.active_end = { }
1274
1275            self.eid = eid
1276            self.master = master
1277            self.tmpdir = tmpdir
1278            self.gw_pubkey_base = gw_pubkey
1279            self.gw_secretkey_base = gw_secretkey
1280
1281            self.copy_file = copy_file
1282            self.fedkit = fedkit
1283
1284
1285        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1286                active_end, tbparams, dtb, myname, desthost, type):
1287            """
1288            Produce a gateway configuration file from a gateways line.
1289            """
1290
1291            sproject = tbparams[gw].get('project', 'project')
1292            dproject = tbparams[dtb].get('project', 'project')
1293            sdomain = ".%s.%s%s" % (eid, sproject,
1294                    tbparams[gw].get('domain', ".example.com"))
1295            ddomain = ".%s.%s%s" % (eid, dproject,
1296                    tbparams[dtb].get('domain', ".example.com"))
1297            boss = tbparams[master].get('boss', "boss")
1298            fs = tbparams[master].get('fs', "fs")
1299            event_server = "%s%s" % \
1300                    (tbparams[gw].get('eventserver', "event_server"),
1301                            tbparams[gw].get('domain', "example.com"))
1302            remote_event_server = "%s%s" % \
1303                    (tbparams[dtb].get('eventserver', "event_server"),
1304                            tbparams[dtb].get('domain', "example.com"))
1305            seer_control = "%s%s" % \
1306                    (tbparams[gw].get('control', "control"), sdomain)
1307            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1308
1309            if self.fedkit:
1310                remote_script_dir = "/usr/local/federation/bin"
1311                local_script_dir = "/usr/local/federation/bin"
1312            else:
1313                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1314                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1315
1316            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1317            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1318            tunnel_cfg = tbparams[gw].get("tun", "false")
1319
1320            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1321            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1322
1323            # translate to lower case so the `hostname` hack for specifying
1324            # configuration files works.
1325            conf_file = conf_file.lower();
1326            remote_conf_file = remote_conf_file.lower();
1327
1328            if dtb == master:
1329                active = "false"
1330            elif gw == master:
1331                active = "true"
1332            elif active_end.has_key('%s-%s' % (dtb, gw)):
1333                active = "false"
1334            else:
1335                active_end['%s-%s' % (gw, dtb)] = 1
1336                active = "true"
1337
1338            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1339            print >>gwconfig, "Active: %s" % active
1340            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1341            if tunnel_iface:
1342                print >>gwconfig, "Interface: %s" % tunnel_iface
1343            print >>gwconfig, "BossName: %s" % boss
1344            print >>gwconfig, "FsName: %s" % fs
1345            print >>gwconfig, "EventServerName: %s" % event_server
1346            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1347            print >>gwconfig, "SeerControl: %s" % seer_control
1348            print >>gwconfig, "Type: %s" % type
1349            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1350            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1351                    local_script_dir
1352            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1353            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1354            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1355                    (remote_conf_dir, remote_conf_file)
1356            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1357            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1358            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1359            gwconfig.close()
1360
1361            return active == "true"
1362
1363        def __call__(self, line, allocated, tbparams):
1364            # Process gateways
1365            if not self.current_gateways:
1366                m = self.begin_gateways.match(line)
1367                if m:
1368                    self.current_gateways = m.group(1)
1369                    if allocated.has_key(self.current_gateways):
1370                        # This test should always succeed
1371                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1372                        if not os.path.exists(tb_dir):
1373                            try:
1374                                os.mkdir(tb_dir)
1375                            except IOError:
1376                                raise service_error(service_error.internal,
1377                                        "Cannot create %s" % tb_dir)
1378                    else:
1379                        # XXX
1380                        self.log.error("[gateways]: Ignoring gateways for " + \
1381                                "unknown testbed %s" % self.current_gateways)
1382                        self.current_gateways = None
1383                    return True
1384                else:
1385                    return False
1386            else:
1387                m = self.end_gateways.match(line)
1388                if m :
1389                    if m.group(1) != self.current_gateways:
1390                        raise service_error(service_error.internal,
1391                                "Mismatched gateway markers!?")
1392                    if self.control_gateway:
1393                        try:
1394                            cc = open("%s/%s/client.conf" %
1395                                    (self.tmpdir, self.current_gateways), 'w')
1396                            print >>cc, "ControlGateway: %s" % \
1397                                    self.control_gateway
1398                            if tbparams[self.master].has_key('smbshare'):
1399                                print >>cc, "SMBSHare: %s" % \
1400                                        tbparams[self.master]['smbshare']
1401                            print >>cc, "ProjectUser: %s" % \
1402                                    tbparams[self.master]['user']
1403                            print >>cc, "ProjectName: %s" % \
1404                                    tbparams[self.master]['project']
1405                            print >>cc, "ExperimentID: %s/%s" % \
1406                                    ( tbparams[self.master]['project'], \
1407                                    self.eid )
1408                            cc.close()
1409                        except IOError:
1410                            raise service_error(service_error.internal,
1411                                    "Error creating client config")
1412                        # XXX: This seer specific file should disappear
1413                        try:
1414                            cc = open("%s/%s/seer.conf" %
1415                                    (self.tmpdir, self.current_gateways),
1416                                    'w')
1417                            if self.current_gateways != self.master:
1418                                print >>cc, "ControlNode: %s" % \
1419                                        self.control_gateway
1420                            print >>cc, "ExperimentID: %s/%s" % \
1421                                    ( tbparams[self.master]['project'], \
1422                                    self.eid )
1423                            cc.close()
1424                        except IOError:
1425                            raise service_error(service_error.internal,
1426                                    "Error creating seer config")
1427                    else:
1428                        debug.error("[gateways]: No control gateway for %s" %\
1429                                    self.current_gateways)
1430                    self.current_gateways = None
1431                else:
1432                    dtb, myname, desthost, type = line.split(" ")
1433
1434                    if type == "control" or type == "both":
1435                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1436                                self.eid, 
1437                                tbparams[self.current_gateways]['project'],
1438                                tbparams[self.current_gateways]['domain'])
1439                    try:
1440                        active = self.gateway_conf_file(self.current_gateways,
1441                                self.master, self.eid, self.gw_pubkey_base,
1442                                self.gw_secretkey_base,
1443                                self.active_end, tbparams, dtb, myname,
1444                                desthost, type)
1445                    except IOError, e:
1446                        raise service_error(service_error.internal,
1447                                "Failed to write config file for %s" % \
1448                                        self.current_gateway)
1449           
1450                    gw_pubkey = "%s/keys/%s" % \
1451                            (self.tmpdir, self.gw_pubkey_base)
1452                    gw_secretkey = "%s/keys/%s" % \
1453                            (self.tmpdir, self.gw_secretkey_base)
1454
1455                    pkfile = "%s/%s/%s" % \
1456                            ( self.tmpdir, self.current_gateways, 
1457                                    self.gw_pubkey_base)
1458                    skfile = "%s/%s/%s" % \
1459                            ( self.tmpdir, self.current_gateways, 
1460                                    self.gw_secretkey_base)
1461
1462                    if not os.path.exists(pkfile):
1463                        try:
1464                            self.copy_file(gw_pubkey, pkfile)
1465                        except IOError:
1466                            service_error(service_error.internal,
1467                                    "Failed to copy pubkey file")
1468
1469                    if active and not os.path.exists(skfile):
1470                        try:
1471                            self.copy_file(gw_secretkey, skfile)
1472                        except IOError:
1473                            service_error(service_error.internal,
1474                                    "Failed to copy secretkey file")
1475                return True
1476
1477    class shunt_to_file:
1478        """
1479        Simple class to write data between two regexps to a file.
1480        """
1481        def __init__(self, begin, end, filename):
1482            """
1483            Begin shunting on a match of begin, stop on end, send data to
1484            filename.
1485            """
1486            self.begin = re.compile(begin)
1487            self.end = re.compile(end)
1488            self.in_shunt = False
1489            self.file = None
1490            self.filename = filename
1491
1492        def __call__(self, line):
1493            """
1494            Call this on each line in the input that may be shunted.
1495            """
1496            if not self.in_shunt:
1497                if self.begin.match(line):
1498                    self.in_shunt = True
1499                    try:
1500                        self.file = open(self.filename, "w")
1501                    except:
1502                        self.file = None
1503                        raise
1504                    return True
1505                else:
1506                    return False
1507            else:
1508                if self.end.match(line):
1509                    if self.file: 
1510                        self.file.close()
1511                        self.file = None
1512                    self.in_shunt = False
1513                else:
1514                    if self.file:
1515                        print >>self.file, line
1516                return True
1517
1518    class shunt_to_list:
1519        """
1520        Same interface as shunt_to_file.  Data collected in self.list, one list
1521        element per line.
1522        """
1523        def __init__(self, begin, end):
1524            self.begin = re.compile(begin)
1525            self.end = re.compile(end)
1526            self.in_shunt = False
1527            self.list = [ ]
1528       
1529        def __call__(self, line):
1530            if not self.in_shunt:
1531                if self.begin.match(line):
1532                    self.in_shunt = True
1533                    return True
1534                else:
1535                    return False
1536            else:
1537                if self.end.match(line):
1538                    self.in_shunt = False
1539                else:
1540                    self.list.append(line)
1541                return True
1542
1543    class shunt_to_string:
1544        """
1545        Same interface as shunt_to_file.  Data collected in self.str, all in
1546        one string.
1547        """
1548        def __init__(self, begin, end):
1549            self.begin = re.compile(begin)
1550            self.end = re.compile(end)
1551            self.in_shunt = False
1552            self.str = ""
1553       
1554        def __call__(self, line):
1555            if not self.in_shunt:
1556                if self.begin.match(line):
1557                    self.in_shunt = True
1558                    return True
1559                else:
1560                    return False
1561            else:
1562                if self.end.match(line):
1563                    self.in_shunt = False
1564                else:
1565                    self.str += line
1566                return True
1567
1568    def create_experiment(self, req, fid):
1569        """
1570        The external interface to experiment creation called from the
1571        dispatcher.
1572
1573        Creates a working directory, splits the incoming description using the
1574        splitter script and parses out the avrious subsections using the
1575        lcasses above.  Once each sub-experiment is created, use pooled threads
1576        to instantiate them and start it all up.
1577        """
1578
1579        if not self.auth.check_attribute(fid, 'create'):
1580            raise service_error(service_error.access, "Create access denied")
1581
1582        try:
1583            tmpdir = tempfile.mkdtemp(prefix="split-")
1584        except IOError:
1585            raise service_error(service_error.internal, "Cannot create tmp dir")
1586
1587        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1588        gw_secretkey_base = "fed.%s" % self.ssh_type
1589        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1590        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1591        tclfile = tmpdir + "/experiment.tcl"
1592        tbparams = { }
1593        try:
1594            access_user = self.accessdb[fid]
1595        except KeyError:
1596            raise service_error(service_error.internal,
1597                    "Access map and authorizer out of sync in " + \
1598                            "create_experiment for fedid %s"  % fid)
1599
1600        pid = "dummy"
1601        gid = "dummy"
1602        # XXX
1603        fail_soft = False
1604
1605        try:
1606            os.mkdir(tmpdir+"/keys")
1607        except OSError:
1608            raise service_error(service_error.internal,
1609                    "Can't make temporary dir")
1610
1611        req = req.get('CreateRequestBody', None)
1612        if not req:
1613            raise service_error(service_error.req,
1614                    "Bad request format (no CreateRequestBody)")
1615        # The tcl parser needs to read a file so put the content into that file
1616        descr=req.get('experimentdescription', None)
1617        if descr:
1618            file_content=descr.get('ns2description', None)
1619            if file_content:
1620                try:
1621                    f = open(tclfile, 'w')
1622                    f.write(file_content)
1623                    f.close()
1624                except IOError:
1625                    raise service_error(service_error.internal,
1626                            "Cannot write temp experiment description")
1627            else:
1628                raise service_error(service_error.req, 
1629                        "Only ns2descriptions supported")
1630        else:
1631            raise service_error(service_error.req, "No experiment description")
1632
1633        if req.has_key('experimentID') and \
1634                req['experimentID'].has_key('localname'):
1635            eid = req['experimentID']['localname']
1636            self.state_lock.acquire()
1637            while (self.state.has_key(eid)):
1638                eid += random.choice(string.ascii_letters)
1639            # To avoid another thread picking this localname
1640            self.state[eid] = "placeholder"
1641            self.state_lock.release()
1642        else:
1643            eid = self.exp_stem
1644            for i in range(0,5):
1645                eid += random.choice(string.ascii_letters)
1646            self.state_lock.acquire()
1647            while (self.state.has_key(eid)):
1648                eid = self.exp_stem
1649                for i in range(0,5):
1650                    eid += random.choice(string.ascii_letters)
1651            # To avoid another thread picking this localname
1652            self.state[eid] = "placeholder"
1653            self.state_lock.release()
1654
1655        try: 
1656            # This catches exceptions to clear the placeholder if necessary
1657            try:
1658                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1659            except ValueError:
1660                raise service_error(service_error.server_config, 
1661                        "Bad key type (%s)" % self.ssh_type)
1662
1663            user = req.get('user', None)
1664            if user == None:
1665                raise service_error(service_error.req, "No user")
1666
1667            master = req.get('master', None)
1668            if not master:
1669                raise service_error(service_error.req,
1670                        "No master testbed label")
1671            export_project = req.get('exportProject', None)
1672            if not export_project:
1673                raise service_error(service_error.req, "No export project")
1674           
1675            if self.splitter_url:
1676                self.log.debug("Calling remote splitter at %s" % \
1677                        self.splitter_url)
1678                split_data = self.remote_splitter(self.splitter_url,
1679                        file_content, master)
1680            else:
1681                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1682                    str(self.muxmax), '-m', master]
1683
1684                if self.fedkit:
1685                    tclcmd.append('-k')
1686
1687                if self.gatewaykit:
1688                    tclcmd.append('-K')
1689
1690                tclcmd.extend([pid, gid, eid, tclfile])
1691
1692                self.log.debug("running local splitter %s", " ".join(tclcmd))
1693                tclparser = Popen(tclcmd, stdout=PIPE)
1694                split_data = tclparser.stdout
1695
1696            allocated = { }         # Testbeds we can access
1697            started = { }           # Testbeds where a sub-experiment started
1698                                # successfully
1699
1700            # Objects to parse the splitter output (defined above)
1701            parse_current_testbed = self.current_testbed(eid, tmpdir,
1702                    self.fedkit, self.gatewaykit)
1703            parse_allbeds = self.allbeds(self.get_access)
1704            parse_gateways = self.gateways(eid, master, tmpdir,
1705                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1706                    self.fedkit)
1707            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1708                        "^#\s+End\s+Vtopo")
1709            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1710                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1711            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1712                    "^#\s+End\s+tarfiles")
1713            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1714                    "^#\s+End\s+rpms")
1715
1716            # Working on the split data
1717            for line in split_data:
1718                line = line.rstrip()
1719                if parse_current_testbed(line, master, allocated, tbparams):
1720                    continue
1721                elif parse_allbeds(line, user, tbparams, master, export_project,
1722                        access_user):
1723                    continue
1724                elif parse_gateways(line, allocated, tbparams):
1725                    continue
1726                elif parse_vtopo(line):
1727                    continue
1728                elif parse_hostnames(line):
1729                    continue
1730                elif parse_tarfiles(line):
1731                    continue
1732                elif parse_rpms(line):
1733                    continue
1734                else:
1735                    raise service_error(service_error.internal, 
1736                            "Bad tcl parse? %s" % line)
1737            # Virtual topology and visualization
1738            vtopo = self.gentopo(parse_vtopo.str)
1739            if not vtopo:
1740                raise service_error(service_error.internal, 
1741                        "Failed to generate virtual topology")
1742
1743            vis = self.genviz(vtopo)
1744            if not vis:
1745                raise service_error(service_error.internal, 
1746                        "Failed to generate visualization")
1747           
1748            # save federant information
1749            for k in allocated.keys():
1750                tbparams[k]['federant'] = {\
1751                        'name': [ { 'localname' : eid} ],\
1752                        'emulab': tbparams[k]['emulab'],\
1753                        'allocID' : tbparams[k]['allocID'],\
1754                        'master' : k == master,\
1755                    }
1756
1757
1758            # Copy tarfiles and rpms needed at remote sites into a staging area
1759            try:
1760                if self.fedkit:
1761                    for t in self.fedkit:
1762                        parse_tarfiles.list.append(t[1])
1763                if self.gatewaykit:
1764                    for t in self.gatewaykit:
1765                        parse_tarfiles.list.append(t[1])
1766                for t in parse_tarfiles.list:
1767                    if not os.path.exists("%s/tarfiles" % tmpdir):
1768                        os.mkdir("%s/tarfiles" % tmpdir)
1769                    self.copy_file(t, "%s/tarfiles/%s" % \
1770                            (tmpdir, os.path.basename(t)))
1771                for r in parse_rpms.list:
1772                    if not os.path.exists("%s/rpms" % tmpdir):
1773                        os.mkdir("%s/rpms" % tmpdir)
1774                    self.copy_file(r, "%s/rpms/%s" % \
1775                            (tmpdir, os.path.basename(r)))
1776            except IOError, e:
1777                raise service_error(service_error.internal, 
1778                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1779
1780        except service_error, e:
1781            # If something goes wrong in the parse (usually an access error)
1782            # clear the placeholder state.  From here on out the code delays
1783            # exceptions.
1784            self.state_lock.acquire()
1785            del self.state[eid]
1786            self.state_lock.release()
1787            raise e
1788
1789        thread_pool = self.thread_pool(self.nthreads)
1790        threads = [ ]
1791
1792        for tb in [ k for k in allocated.keys() if k != master]:
1793            # Create and start a thread to start the segment, and save it to
1794            # get the return value later
1795            thread_pool.wait_for_slot()
1796            t  = self.pooled_thread(target=self.start_segment, 
1797                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1798                    pdata=thread_pool, trace_file=self.trace_file)
1799            threads.append(t)
1800            t.start()
1801
1802        # Wait until all finish
1803        thread_pool.wait_for_all_done()
1804
1805        # If none failed, start the master
1806        failed = [ t.getName() for t in threads if not t.rv ]
1807
1808        if len(failed) == 0:
1809            if not self.start_segment(master, eid, tbparams, tmpdir):
1810                failed.append(master)
1811
1812        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1813        # If one failed clean up, unless fail_soft is set
1814        if failed:
1815            if not fail_soft:
1816                thread_pool.clear()
1817                for tb in succeeded:
1818                    # Create and start a thread to stop the segment
1819                    thread_pool.wait_for_slot()
1820                    t  = self.pooled_thread(target=self.stop_segment, 
1821                            args=(tb, eid, tbparams), name=tb,
1822                            pdata=thread_pool, trace_file=self.trace_file)
1823                    t.start()
1824                # Wait until all finish
1825                thread_pool.wait_for_all_done()
1826
1827                # release the allocations
1828                for tb in tbparams.keys():
1829                    self.release_access(tb, tbparams[tb]['allocID'])
1830                # Remove the placeholder
1831                self.state_lock.acquire()
1832                del self.state[eid]
1833                self.state_lock.release()
1834
1835                raise service_error(service_error.federant,
1836                    "Swap in failed on %s" % ",".join(failed))
1837        else:
1838            self.log.info("[start_segment]: Experiment %s started" % eid)
1839
1840        # Generate an ID for the experiment (slice) and a certificate that the
1841        # allocator can use to prove they own it.  We'll ship it back through
1842        # the encrypted connection.
1843        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1844
1845        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1846
1847        # Walk up tmpdir, deleting as we go
1848        for path, dirs, files in os.walk(tmpdir, topdown=False):
1849            for f in files:
1850                os.remove(os.path.join(path, f))
1851            for d in dirs:
1852                os.rmdir(os.path.join(path, d))
1853        os.rmdir(tmpdir)
1854
1855        # The deepcopy prevents the allocation ID and other binaries from being
1856        # translated into other formats
1857        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1858                for tb in tbparams.keys() \
1859                    if tbparams[tb].has_key('federant') ],\
1860                    'vtopo': vtopo,\
1861                    'vis' : vis,
1862                    'experimentID' : [\
1863                            { 'fedid': copy.copy(expid) }, \
1864                            { 'localname': eid },\
1865                        ],\
1866                    'experimentAccess': { 'X509' : expcert },\
1867                }
1868        # remove the allocationID info from each federant
1869        for f in resp['federant']:
1870            if f.has_key('allocID'): del f['allocID']
1871
1872        # Insert the experiment into our state and update the disk copy
1873        self.state_lock.acquire()
1874        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1875                for tb in tbparams.keys() \
1876                    if tbparams[tb].has_key('federant') ],\
1877                    'vtopo': vtopo,\
1878                    'vis' : vis,
1879                    'owner': fid,
1880                    'experimentID' : [\
1881                            { 'fedid': expid }, { 'localname': eid },\
1882                        ],\
1883                }
1884        self.state[eid] = self.state[expid]
1885        if self.state_filename: self.write_state()
1886        self.state_lock.release()
1887
1888        self.auth.set_attribute(fid, expid)
1889        self.auth.set_attribute(expid, expid)
1890
1891        if not failed:
1892            return resp
1893        else:
1894            raise service_error(service_error.partial, \
1895                    "Partial swap in on %s" % ",".join(succeeded))
1896
1897    def check_experiment_access(self, fid, key):
1898        """
1899        Confirm that the fid has access to the experiment.  Though a request
1900        may be made in terms of a local name, the access attribute is always
1901        the experiment's fedid.
1902        """
1903        if not isinstance(key, fedid):
1904            self.state_lock.acquire()
1905            if self.state.has_key(key):
1906                if isinstance(self.state[key], dict):
1907                    try:
1908                        kl = [ f['fedid'] for f in \
1909                                self.state[key]['experimentID']\
1910                                    if f.has_key('fedid') ]
1911                    except KeyError:
1912                        self.state_lock.release()
1913                        raise service_error(service_error.internal, 
1914                                "No fedid for experiment %s when checking " +\
1915                                        "access(!?)" % key)
1916                    if len(kl) == 1:
1917                        key = kl[0]
1918                    else:
1919                        self.state_lock.release()
1920                        raise service_error(service_error.internal, 
1921                                "multiple fedids for experiment %s when " +\
1922                                        "checking access(!?)" % key)
1923                elif isinstance(self.state[key], str):
1924                    self.state_lock.release()
1925                    raise service_error(service_error.internal, 
1926                            ("experiment %s is placeholder.  " +\
1927                                    "Creation in progress or aborted oddly") \
1928                                    % key)
1929                else:
1930                    self.state_lock.release()
1931                    raise service_error(service_error.internal, 
1932                            "Unexpected state for %s" % key)
1933
1934            else:
1935                self.state_lock.release()
1936                raise service_error(service_error.access, "Access Denied")
1937            self.state_lock.release()
1938
1939        if self.auth.check_attribute(fid, key):
1940            return True
1941        else:
1942            raise service_error(service_error.access, "Access Denied")
1943
1944
1945
1946    def get_vtopo(self, req, fid):
1947        """
1948        Return the stored virtual topology for this experiment
1949        """
1950        rv = None
1951
1952        req = req.get('VtopoRequestBody', None)
1953        if not req:
1954            raise service_error(service_error.req,
1955                    "Bad request format (no VtopoRequestBody)")
1956        exp = req.get('experiment', None)
1957        if exp:
1958            if exp.has_key('fedid'):
1959                key = exp['fedid']
1960                keytype = "fedid"
1961            elif exp.has_key('localname'):
1962                key = exp['localname']
1963                keytype = "localname"
1964            else:
1965                raise service_error(service_error.req, "Unknown lookup type")
1966        else:
1967            raise service_error(service_error.req, "No request?")
1968
1969        self.check_experiment_access(fid, key)
1970
1971        self.state_lock.acquire()
1972        if self.state.has_key(key):
1973            rv = { 'experiment' : {keytype: key },\
1974                    'vtopo': self.state[key]['vtopo'],\
1975                }
1976        self.state_lock.release()
1977
1978        if rv: return rv
1979        else: raise service_error(service_error.req, "No such experiment")
1980
1981    def get_vis(self, req, fid):
1982        """
1983        Return the stored visualization for this experiment
1984        """
1985        rv = None
1986
1987        req = req.get('VisRequestBody', None)
1988        if not req:
1989            raise service_error(service_error.req,
1990                    "Bad request format (no VisRequestBody)")
1991        exp = req.get('experiment', None)
1992        if exp:
1993            if exp.has_key('fedid'):
1994                key = exp['fedid']
1995                keytype = "fedid"
1996            elif exp.has_key('localname'):
1997                key = exp['localname']
1998                keytype = "localname"
1999            else:
2000                raise service_error(service_error.req, "Unknown lookup type")
2001        else:
2002            raise service_error(service_error.req, "No request?")
2003
2004        self.check_experiment_access(fid, key)
2005
2006        self.state_lock.acquire()
2007        if self.state.has_key(key):
2008            rv =  { 'experiment' : {keytype: key },\
2009                    'vis': self.state[key]['vis'],\
2010                    }
2011        self.state_lock.release()
2012
2013        if rv: return rv
2014        else: raise service_error(service_error.req, "No such experiment")
2015
2016    def get_info(self, req, fid):
2017        """
2018        Return all the stored info about this experiment
2019        """
2020        rv = None
2021
2022        req = req.get('InfoRequestBody', None)
2023        if not req:
2024            raise service_error(service_error.req,
2025                    "Bad request format (no VisRequestBody)")
2026        exp = req.get('experiment', None)
2027        if exp:
2028            if exp.has_key('fedid'):
2029                key = exp['fedid']
2030                keytype = "fedid"
2031            elif exp.has_key('localname'):
2032                key = exp['localname']
2033                keytype = "localname"
2034            else:
2035                raise service_error(service_error.req, "Unknown lookup type")
2036        else:
2037            raise service_error(service_error.req, "No request?")
2038
2039        self.check_experiment_access(fid, key)
2040
2041        # The state may be massaged by the service function that called
2042        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2043        # state.
2044        self.state_lock.acquire()
2045        if self.state.has_key(key):
2046            rv = copy.deepcopy(self.state[key])
2047        self.state_lock.release()
2048        # Remove the owner info
2049        del rv['owner']
2050        # remove the allocationID info from each federant
2051        for f in rv['federant']:
2052            if f.has_key('allocID'): del f['allocID']
2053
2054        if rv: return rv
2055        else: raise service_error(service_error.req, "No such experiment")
2056
2057
2058    def terminate_experiment(self, req, fid):
2059        """
2060        Swap this experiment out on the federants and delete the shared
2061        information
2062        """
2063        tbparams = { }
2064        req = req.get('TerminateRequestBody', None)
2065        if not req:
2066            raise service_error(service_error.req,
2067                    "Bad request format (no TerminateRequestBody)")
2068        exp = req.get('experiment', None)
2069        if exp:
2070            if exp.has_key('fedid'):
2071                key = exp['fedid']
2072                keytype = "fedid"
2073            elif exp.has_key('localname'):
2074                key = exp['localname']
2075                keytype = "localname"
2076            else:
2077                raise service_error(service_error.req, "Unknown lookup type")
2078        else:
2079            raise service_error(service_error.req, "No request?")
2080
2081        self.check_experiment_access(fid, key)
2082
2083        self.state_lock.acquire()
2084        fed_exp = self.state.get(key, None)
2085
2086        if fed_exp:
2087            # This branch of the conditional holds the lock to generate a
2088            # consistent temporary tbparams variable to deallocate experiments.
2089            # It releases the lock to do the deallocations and reacquires it to
2090            # remove the experiment state when the termination is complete.
2091            ids = []
2092            #  experimentID is a list of dicts that are self-describing
2093            #  identifiers.  This finds all the fedids and localnames - the
2094            #  keys of self.state - and puts them into ids.
2095            for id in fed_exp.get('experimentID', []):
2096                if id.has_key('fedid'): ids.append(id['fedid'])
2097                if id.has_key('localname'): ids.append(id['localname'])
2098
2099            # Construct enough of the tbparams to make the stop_segment calls
2100            # work
2101            for fed in fed_exp['federant']:
2102                try:
2103                    for e in fed['name']:
2104                        eid = e.get('localname', None)
2105                        if eid: break
2106                    else:
2107                        continue
2108
2109                    p = fed['emulab']['project']
2110
2111                    project = p['name']['localname']
2112                    tb = p['testbed']['localname']
2113                    user = p['user'][0]['userID']['localname']
2114
2115                    domain = fed['emulab']['domain']
2116                    host  = fed['emulab']['ops']
2117                    aid = fed['allocID']
2118                except KeyError, e:
2119                    continue
2120                tbparams[tb] = {\
2121                        'user': user,\
2122                        'domain': domain,\
2123                        'project': project,\
2124                        'host': host,\
2125                        'eid': eid,\
2126                        'aid': aid,\
2127                    }
2128            self.state_lock.release()
2129
2130            # Stop everyone.
2131            thread_pool = self.thread_pool(self.nthreads)
2132            for tb in tbparams.keys():
2133                # Create and start a thread to stop the segment
2134                thread_pool.wait_for_slot()
2135                t  = self.pooled_thread(target=self.stop_segment, 
2136                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2137                        pdata=thread_pool, trace_file=self.trace_file)
2138                t.start()
2139            # Wait for completions
2140            thread_pool.wait_for_all_done()
2141
2142            # release the allocations
2143            for tb in tbparams.keys():
2144                self.release_access(tb, tbparams[tb]['aid'])
2145
2146            # Remove the terminated experiment
2147            self.state_lock.acquire()
2148            for id in ids:
2149                if self.state.has_key(id): del self.state[id]
2150
2151            if self.state_filename: self.write_state()
2152            self.state_lock.release()
2153
2154            return { 'experiment': exp }
2155        else:
2156            # Don't forget to release the lock
2157            self.state_lock.release()
2158            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.