source: fedd/federation/experiment_control.py @ f9665d1

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since f9665d1 was f9665d1, checked in by Ted Faber <faber@…>, 15 years ago

strict host key checking - having fedd ask for keys when contacting a new host is a no-no

  • Property mode set to 100644
File size: 61.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53
54        def acquire(self):
55            """
56            Get the pool's lock.
57            """
58            self.changed.acquire()
59
60        def release(self):
61            """
62            Release the pool's lock.
63            """
64            self.changed.release()
65
66        def wait(self, timeout = None):
67            """
68            Wait for a pool thread to start or stop.
69            """
70            self.changed.wait(timeout)
71
72        def start(self):
73            """
74            Called by a pool thread to report starting.
75            """
76            self.changed.acquire()
77            self.started += 1
78            self.changed.notifyAll()
79            self.changed.release()
80
81        def terminate(self):
82            """
83            Called by a pool thread to report finishing.
84            """
85            self.changed.acquire()
86            self.terminated += 1
87            self.changed.notifyAll()
88            self.changed.release()
89
90        def clear(self):
91            """
92            Clear all pool data.
93            """
94            self.changed.acquire()
95            self.started = 0
96            self.terminated =0
97            self.changed.notifyAll()
98            self.changed.release()
99
100    class pooled_thread(Thread):
101        """
102        One of a set of threads dedicated to a specific task.  Uses the
103        thread_pool class above for coordination.
104        """
105        def __init__(self, group=None, target=None, name=None, args=(), 
106                kwargs={}, pdata=None, trace_file=None):
107            Thread.__init__(self, group, target, name, args, kwargs)
108            self.rv = None          # Return value of the ops in this thread
109            self.exception = None   # Exception that terminated this thread
110            self.target=target      # Target function to run on start()
111            self.args = args        # Args to pass to target
112            self.kwargs = kwargs    # Additional kw args
113            self.pdata = pdata      # thread_pool for this class
114            # Logger for this thread
115            self.log = logging.getLogger("fedd.experiment_control")
116       
117        def run(self):
118            """
119            Emulate Thread.run, except add pool data manipulation and error
120            logging.
121            """
122            if self.pdata:
123                self.pdata.start()
124
125            if self.target:
126                try:
127                    self.rv = self.target(*self.args, **self.kwargs)
128                except service_error, s:
129                    self.exception = s
130                    self.log.error("Thread exception: %s %s" % \
131                            (s.code_string(), s.desc))
132                except:
133                    self.exception = sys.exc_info()[1]
134                    self.log.error(("Unexpected thread exception: %s" +\
135                            "Trace %s") % (self.exception,\
136                                traceback.format_exc()))
137            if self.pdata:
138                self.pdata.terminate()
139
140    call_RequestAccess = service_caller('RequestAccess')
141    call_ReleaseAccess = service_caller('ReleaseAccess')
142    call_Ns2Split = service_caller('Ns2Split')
143
144    def __init__(self, config=None, auth=None):
145        """
146        Intialize the various attributes, most from the config object
147        """
148        self.thread_with_rv = experiment_control_local.pooled_thread
149        self.thread_pool = experiment_control_local.thread_pool
150
151        self.cert_file = config.get("experiment_control", "cert_file")
152        if self.cert_file:
153            self.cert_pwd = config.get("experiment_control", "cert_pwd")
154        else:
155            self.cert_file = config.get("globals", "cert_file")
156            self.cert_pwd = config.get("globals", "cert_pwd")
157
158        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
159                or config.get("globals", "trusted_certs")
160
161        self.exp_stem = "fed-stem"
162        self.log = logging.getLogger("fedd.experiment_control")
163        set_log_level(config, "experiment_control", self.log)
164        self.muxmax = 2
165        self.nthreads = 2
166        self.randomize_experiments = False
167
168        self.scp_exec = "/usr/bin/scp"
169        self.splitter = None
170        self.ssh_exec="/usr/bin/ssh"
171        self.ssh_keygen = "/usr/bin/ssh-keygen"
172        self.ssh_identity_file = None
173
174
175        self.debug = config.getboolean("experiment_control", "create_debug")
176        self.state_filename = config.get("experiment_control", 
177                "experiment_state")
178        self.splitter_url = config.get("experiment_control", "splitter_uri")
179        self.fedkit = config.get("experiment_control", "fedkit")
180        accessdb_file = config.get("experiment_control", "accessdb")
181
182        self.ssh_pubkey_file = config.get("experiment_control", 
183                "ssh_pubkey_file")
184        self.ssh_privkey_file = config.get("experiment_control",
185                "ssh_privkey_file")
186        # NB for internal master/slave ops, not experiment setup
187        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
188        self.state = { }
189        self.state_lock = Lock()
190        self.tclsh = "/usr/local/bin/otclsh"
191        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
192                config.get("experiment_control", "tcl_splitter",
193                        "/usr/testbed/lib/ns2ir/parse.tcl")
194        mapdb_file = config.get("experiment_control", "mapdb")
195        self.trace_file = sys.stderr
196
197        self.def_expstart = \
198                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
199                "/tmp/federate";
200        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
201                "FEDDIR/hosts";
202        self.def_gwstart = \
203                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
204                "/tmp/bridge.log";
205        self.def_mgwstart = \
206                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
207                "/tmp/bridge.log";
208        self.def_gwimage = "FBSD61-TUNNEL2";
209        self.def_gwtype = "pc";
210        self.local_access = { }
211
212        if auth:
213            self.auth = auth
214        else:
215            self.log.error(\
216                    "[access]: No authorizer initialized, creating local one.")
217            auth = authorizer()
218
219
220        if self.ssh_pubkey_file:
221            try:
222                f = open(self.ssh_pubkey_file, 'r')
223                self.ssh_pubkey = f.read()
224                f.close()
225            except IOError:
226                raise service_error(service_error.internal,
227                        "Cannot read sshpubkey")
228        else:
229            raise service_error(service_error.internal, 
230                    "No SSH public key file?")
231
232        if not self.ssh_privkey_file:
233            raise service_error(service_error.internal, 
234                    "No SSH public key file?")
235
236
237        if mapdb_file:
238            self.read_mapdb(mapdb_file)
239        else:
240            self.log.warn("[experiment_control] No testbed map, using defaults")
241            self.tbmap = { 
242                    'deter':'https://users.isi.deterlab.net:23235',
243                    'emulab':'https://users.isi.deterlab.net:23236',
244                    'ucb':'https://users.isi.deterlab.net:23237',
245                    }
246
247        if accessdb_file:
248                self.read_accessdb(accessdb_file)
249        else:
250            raise service_error(service_error.internal,
251                    "No accessdb specified in config")
252
253        # Grab saved state.  OK to do this w/o locking because it's read only
254        # and only one thread should be in existence that can see self.state at
255        # this point.
256        if self.state_filename:
257            self.read_state()
258
259        # Dispatch tables
260        self.soap_services = {\
261                'Create': soap_handler('Create', self.create_experiment),
262                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
263                'Vis': soap_handler('Vis', self.get_vis),
264                'Info': soap_handler('Info', self.get_info),
265                'Terminate': soap_handler('Terminate', 
266                    self.terminate_experiment),
267        }
268
269        self.xmlrpc_services = {\
270                'Create': xmlrpc_handler('Create', self.create_experiment),
271                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
272                'Vis': xmlrpc_handler('Vis', self.get_vis),
273                'Info': xmlrpc_handler('Info', self.get_info),
274                'Terminate': xmlrpc_handler('Terminate',
275                    self.terminate_experiment),
276        }
277
278    def copy_file(self, src, dest, size=1024):
279        """
280        Exceedingly simple file copy.
281        """
282        s = open(src,'r')
283        d = open(dest, 'w')
284
285        buf = "x"
286        while buf != "":
287            buf = s.read(size)
288            d.write(buf)
289        s.close()
290        d.close()
291
292    # Call while holding self.state_lock
293    def write_state(self):
294        """
295        Write a new copy of experiment state after copying the existing state
296        to a backup.
297
298        State format is a simple pickling of the state dictionary.
299        """
300        if os.access(self.state_filename, os.W_OK):
301            self.copy_file(self.state_filename, \
302                    "%s.bak" % self.state_filename)
303        try:
304            f = open(self.state_filename, 'w')
305            pickle.dump(self.state, f)
306        except IOError, e:
307            self.log.error("Can't write file %s: %s" % \
308                    (self.state_filename, e))
309        except pickle.PicklingError, e:
310            self.log.error("Pickling problem: %s" % e)
311        except TypeError, e:
312            self.log.error("Pickling problem (TypeError): %s" % e)
313
314    # Call while holding self.state_lock
315    def read_state(self):
316        """
317        Read a new copy of experiment state.  Old state is overwritten.
318
319        State format is a simple pickling of the state dictionary.
320        """
321        try:
322            f = open(self.state_filename, "r")
323            self.state = pickle.load(f)
324            self.log.debug("[read_state]: Read state from %s" % \
325                    self.state_filename)
326        except IOError, e:
327            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
328                    % (self.state_filename, e))
329        except pickle.UnpicklingError, e:
330            self.log.warning(("[read_state]: No saved state: " + \
331                    "Unpickling failed: %s") % e)
332       
333        for k in self.state.keys():
334            try:
335                # This list should only have one element in it, but phrasing it
336                # as a for loop doesn't cost much, really.  We have to find the
337                # fedid elements anyway.
338                for eid in [ f['fedid'] \
339                        for f in self.state[k]['experimentID']\
340                            if f.has_key('fedid') ]:
341                    self.auth.set_attribute(self.state[k]['owner'], eid)
342            except KeyError, e:
343                self.log.warning("[read_state]: State ownership or identity " +\
344                        "misformatted in %s: %s" % (self.state_filename, e))
345
346
347    def read_accessdb(self, accessdb_file):
348        """
349        Read the mapping from fedids that can create experiments to their name
350        in the 3-level access namespace.  All will be asserted from this
351        testbed and can include the local username and porject that will be
352        asserted on their behalf by this fedd.  Each fedid is also added to the
353        authorization system with the "create" attribute.
354        """
355        self.accessdb = {}
356        # These are the regexps for parsing the db
357        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
358        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
359                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
360        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
361                "\s*->\s*(" + name_expr + ")\s*$")
362        lineno = 0
363
364        # Parse the mappings and store in self.authdb, a dict of
365        # fedid -> (proj, user)
366        try:
367            f = open(accessdb_file, "r")
368            for line in f:
369                lineno += 1
370                line = line.strip()
371                if len(line) == 0 or line.startswith('#'):
372                    continue
373                m = project_line.match(line)
374                if m:
375                    fid = fedid(hexstr=m.group(1))
376                    project, user = m.group(2,3)
377                    if not self.accessdb.has_key(fid):
378                        self.accessdb[fid] = []
379                    self.accessdb[fid].append((project, user))
380                    continue
381
382                m = user_line.match(line)
383                if m:
384                    fid = fedid(hexstr=m.group(1))
385                    project = None
386                    user = m.group(2)
387                    if not self.accessdb.has_key(fid):
388                        self.accessdb[fid] = []
389                    self.accessdb[fid].append((project, user))
390                    continue
391                self.log.warn("[experiment_control] Error parsing access " +\
392                        "db %s at line %d" %  (accessdb_file, lineno))
393        except IOError:
394            raise service_error(service_error.internal,
395                    "Error opening/reading %s as experiment " +\
396                            "control accessdb" %  accessdb_file)
397        f.close()
398
399        # Initialize the authorization attributes
400        for fid in self.accessdb.keys():
401            self.auth.set_attribute(fid, 'create')
402
403    def read_mapdb(self, file):
404        """
405        Read a simple colon separated list of mappings for the
406        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
407        """
408
409        self.tbmap = { }
410        lineno =0
411        try:
412            f = open(file, "r")
413            for line in f:
414                lineno += 1
415                line = line.strip()
416                if line.startswith('#') or len(line) == 0:
417                    continue
418                try:
419                    label, url = line.split(':', 1)
420                    self.tbmap[label] = url
421                except ValueError, e:
422                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
423                            "map db: %s %s" % (lineno, line, e))
424        except IOError, e:
425            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
426                    "open %s: %s" % (file, e))
427        f.close()
428
429    def scp_file(self, file, user, host, dest=""):
430        """
431        scp a file to the remote host.  If debug is set the action is only
432        logged.
433        """
434
435        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
436                '-o', 'StrictHostKeyChecking yes', '-i', 
437                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
438        rv = 0
439
440        try:
441            dnull = open("/dev/null", "w")
442        except IOError:
443            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
444            dnull = Null
445
446        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
447        if not self.debug:
448            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
449
450        return rv == 0
451
452    def ssh_cmd(self, user, host, cmd, wname=None):
453        """
454        Run a remote command on host as user.  If debug is set, the action is
455        only logged.
456        """
457        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
458                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
459                (self.ssh_exec, self.ssh_privkey_file, 
460                        user, host, cmd)
461
462        try:
463            dnull = open("/dev/null", "r")
464        except IOError:
465            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
466            dnull = Null
467
468        self.log.debug("[ssh_cmd]: %s" % sh_str)
469        if not self.debug:
470            if dnull:
471                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
472            else:
473                sub = Popen(sh_str, shell=True)
474            return sub.wait() == 0
475        else:
476            return True
477
478    def ship_configs(self, host, user, src_dir, dest_dir):
479        """
480        Copy federant-specific configuration files to the federant.
481        """
482        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
483            return False
484        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
485            return False
486
487        for f in os.listdir(src_dir):
488            if os.path.isdir(f):
489                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
490                        "%s/%s" % (dest_dir, f)):
491                    return False
492            else:
493                if not self.scp_file("%s/%s" % (src_dir, f), 
494                        user, host, dest_dir):
495                    return False
496        return True
497
498    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
499        """
500        Start a sub-experiment on a federant.
501
502        Get the current state, modify or create as appropriate, ship data and
503        configs and start the experiment.  There are small ordering differences
504        based on the initial state of the sub-experiment.
505        """
506        # ops node in the federant
507        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
508        user = tbparams[tb]['user']     # federant user
509        pid = tbparams[tb]['project']   # federant project
510        # XXX
511        base_confs = ( "hosts",)
512        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
513        # command to test experiment state
514        expinfo_exec = "/usr/testbed/bin/expinfo" 
515        # Configuration directories on the remote machine
516        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
517        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
518        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
519        # Regular expressions to parse the expinfo response
520        state_re = re.compile("State:\s+(\w+)")
521        no_exp_re = re.compile("^No\s+such\s+experiment")
522        state = None    # Experiment state parsed from expinfo
523        # The expinfo ssh command.  Note the identity restriction to use only
524        # the identity provided in the pubkey given.
525        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
526                'StrictHostKeyChecking yes', '-i', 
527                self.ssh_privkey_file, "%s@%s" % (user, host), 
528                expinfo_exec, pid, eid]
529
530        # Get status
531        self.log.debug("[start_segment]: %s"% " ".join(cmd))
532        dev_null = None
533        try:
534            dev_null = open("/dev/null", "a")
535        except IOError, e:
536            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
537
538        if self.debug:
539            state = 'swapped'
540            rv = 0
541        else:
542            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
543            for line in status.stdout:
544                m = state_re.match(line)
545                if m: state = m.group(1)
546                else:
547                    m = no_exp_re.match(line)
548                    if m: state = "none"
549            rv = status.wait()
550
551        # If the experiment is not present the subcommand returns a non-zero
552        # return value.  If we successfully parsed a "none" outcome, ignore the
553        # return code.
554        if rv != 0 and state != "none":
555            raise service_error(service_error.internal,
556                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
557
558        self.log.debug("[start_segment]: %s: %s" % (tb, state))
559        self.log.info("[start_segment]:transferring experiment to %s" % tb)
560
561        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
562            return False
563        # Clear the federation config dirs
564        if not self.ssh_cmd(user, host, 
565                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
566            return False
567        # Clear and create the tarfiles and rpm directories
568        for d in (tarfiles_dir, rpms_dir):
569            if not self.ssh_cmd(user, host, 
570                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
571                return False
572            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
573                    "create tarfiles"):
574                return False
575       
576        if state == 'active':
577            # Create the federation config dirs (do not move outside the
578            # conditional.  Happens later in new expriment creation)
579            if not self.ssh_cmd(user, host, 
580                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
581                return False
582            # Remote experiment is active.  Modify it.
583            for f in base_confs:
584                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
585                        "%s/%s" % (proj_dir, f)):
586                    return False
587            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
588                    proj_dir):
589                return False
590            if os.path.isdir("%s/tarfiles" % tmpdir):
591                if not self.ship_configs(host, user,
592                        "%s/tarfiles" % tmpdir, tarfiles_dir):
593                    return False
594            if os.path.isdir("%s/rpms" % tmpdir):
595                if not self.ship_configs(host, user,
596                        "%s/rpms" % tmpdir, tarfiles_dir):
597                    return False
598            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
599            if not self.ssh_cmd(user, host,
600                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
601                            (pid, eid, tclfile), "modexp"):
602                return False
603            return True
604        elif state == "swapped":
605            # Create the federation config dirs (do not move outside the
606            # conditional.  Happens later in new expriment creation)
607            if not self.ssh_cmd(user, host, 
608                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
609                return False
610            # Remote experiment swapped out.  Modify it and swap it in.
611            for f in base_confs:
612                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
613                        "%s/%s" % (proj_dir, f)):
614                    return False
615            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
616                    proj_dir):
617                return False
618            if os.path.isdir("%s/tarfiles" % tmpdir):
619                if not self.ship_configs(host, user,
620                        "%s/tarfiles" % tmpdir, tarfiles_dir):
621                    return False
622            if os.path.isdir("%s/rpms" % tmpdir):
623                if not self.ship_configs(host, user,
624                        "%s/rpms" % tmpdir, tarfiles_dir):
625                    return False
626            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
627            if not self.ssh_cmd(user, host,
628                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
629                    "modexp"):
630                return False
631            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
632            if not self.ssh_cmd(user, host,
633                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
634                    "swapexp"):
635                return False
636            return True
637        elif state == "none":
638            # No remote experiment.  Create one.  We do this in 2 steps so we
639            # can put the configuration files and scripts into the new
640            # experiment directories.
641
642            # Tarfiles must be present for creation to work
643            if os.path.isdir("%s/tarfiles" % tmpdir):
644                if not self.ship_configs(host, user,
645                        "%s/tarfiles" % tmpdir, tarfiles_dir):
646                    return False
647            if os.path.isdir("%s/rpms" % tmpdir):
648                if not self.ship_configs(host, user,
649                        "%s/rpms" % tmpdir, tarfiles_dir):
650                    return False
651            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
652            if not self.ssh_cmd(user, host,
653                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
654                            (pid, eid, tclfile), "startexp"):
655                return False
656            # Create the federation config dirs (do not move outside the
657            # conditional.)
658            if not self.ssh_cmd(user, host, 
659                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
660                return False
661            # After startexp the per-experiment directories exist
662            for f in base_confs:
663                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
664                        "%s/%s" % (proj_dir, f)):
665                    return False
666            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
667                    proj_dir):
668                return False
669            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
670            if not self.ssh_cmd(user, host,
671                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
672                    "swapexp"):
673                return False
674            return True
675        else:
676            self.log.debug("[start_segment]:unknown state %s" % state)
677            return False
678
679    def stop_segment(self, tb, eid, tbparams):
680        """
681        Stop a sub experiment by calling swapexp on the federant
682        """
683        user = tbparams[tb]['user']
684        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
685        pid = tbparams[tb]['project']
686
687        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
688        return self.ssh_cmd(user, host,
689                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
690
691       
692    def generate_ssh_keys(self, dest, type="rsa" ):
693        """
694        Generate a set of keys for the gateways to use to talk.
695
696        Keys are of type type and are stored in the required dest file.
697        """
698        valid_types = ("rsa", "dsa")
699        t = type.lower();
700        if t not in valid_types: raise ValueError
701        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
702
703        try:
704            trace = open("/dev/null", "w")
705        except IOError:
706            raise service_error(service_error.internal,
707                    "Cannot open /dev/null??");
708
709        # May raise CalledProcessError
710        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
711        rv = call(cmd, stdout=trace, stderr=trace)
712        if rv != 0:
713            raise service_error(service_error.internal, 
714                    "Cannot generate nonce ssh keys.  %s return code %d" \
715                            % (self.ssh_keygen, rv))
716
717    def gentopo(self, str):
718        """
719        Generate the topology dtat structure from the splitter's XML
720        representation of it.
721
722        The topology XML looks like:
723            <experiment>
724                <nodes>
725                    <node><vname></vname><ips>ip1:ip2</ips></node>
726                </nodes>
727                <lans>
728                    <lan>
729                        <vname></vname><vnode></vnode><ip></ip>
730                        <bandwidth></bandwidth><member>node:port</member>
731                    </lan>
732                </lans>
733        """
734        class topo_parse:
735            """
736            Parse the topology XML and create the dats structure.
737            """
738            def __init__(self):
739                # Typing of the subelements for data conversion
740                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
741                self.int_subelements = ( 'bandwidth',)
742                self.float_subelements = ( 'delay',)
743                # The final data structure
744                self.nodes = [ ]
745                self.lans =  [ ]
746                self.topo = { \
747                        'node': self.nodes,\
748                        'lan' : self.lans,\
749                    }
750                self.element = { }  # Current element being created
751                self.chars = ""     # Last text seen
752
753            def end_element(self, name):
754                # After each sub element the contents is added to the current
755                # element or to the appropriate list.
756                if name == 'node':
757                    self.nodes.append(self.element)
758                    self.element = { }
759                elif name == 'lan':
760                    self.lans.append(self.element)
761                    self.element = { }
762                elif name in self.str_subelements:
763                    self.element[name] = self.chars
764                    self.chars = ""
765                elif name in self.int_subelements:
766                    self.element[name] = int(self.chars)
767                    self.chars = ""
768                elif name in self.float_subelements:
769                    self.element[name] = float(self.chars)
770                    self.chars = ""
771
772            def found_chars(self, data):
773                self.chars += data.rstrip()
774
775
776        tp = topo_parse();
777        parser = xml.parsers.expat.ParserCreate()
778        parser.EndElementHandler = tp.end_element
779        parser.CharacterDataHandler = tp.found_chars
780
781        parser.Parse(str)
782
783        return tp.topo
784       
785
786    def genviz(self, topo):
787        """
788        Generate the visualization the virtual topology
789        """
790
791        neato = "/usr/local/bin/neato"
792        # These are used to parse neato output and to create the visualization
793        # file.
794        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
795        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
796                "%s</type></node>"
797
798        try:
799            # Node names
800            nodes = [ n['vname'] for n in topo['node'] ]
801            topo_lans = topo['lan']
802        except KeyError:
803            raise service_error(service_error.internal, "Bad topology")
804
805        lans = { }
806        links = { }
807
808        # Walk through the virtual topology, organizing the connections into
809        # 2-node connections (links) and more-than-2-node connections (lans).
810        # When a lan is created, it's added to the list of nodes (there's a
811        # node in the visualization for the lan).
812        for l in topo_lans:
813            if links.has_key(l['vname']):
814                if len(links[l['vname']]) < 2:
815                    links[l['vname']].append(l['vnode'])
816                else:
817                    nodes.append(l['vname'])
818                    lans[l['vname']] = links[l['vname']]
819                    del links[l['vname']]
820                    lans[l['vname']].append(l['vnode'])
821            elif lans.has_key(l['vname']):
822                lans[l['vname']].append(l['vnode'])
823            else:
824                links[l['vname']] = [ l['vnode'] ]
825
826
827        # Open up a temporary file for dot to turn into a visualization
828        try:
829            df, dotname = tempfile.mkstemp()
830            dotfile = os.fdopen(df, 'w')
831        except IOError:
832            raise service_error(service_error.internal,
833                    "Failed to open file in genviz")
834
835        # Generate a dot/neato input file from the links, nodes and lans
836        try:
837            print >>dotfile, "graph G {"
838            for n in nodes:
839                print >>dotfile, '\t"%s"' % n
840            for l in links.keys():
841                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
842            for l in lans.keys():
843                for n in lans[l]:
844                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
845            print >>dotfile, "}"
846            dotfile.close()
847        except TypeError:
848            raise service_error(service_error.internal,
849                    "Single endpoint link in vtopo")
850        except IOError:
851            raise service_error(service_error.internal, "Cannot write dot file")
852
853        # Use dot to create a visualization
854        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
855                '-Gpack=true', dotname], stdout=PIPE)
856
857        # Translate dot to vis format
858        vis_nodes = [ ]
859        vis = { 'node': vis_nodes }
860        for line in dot.stdout:
861            m = vis_re.match(line)
862            if m:
863                vn = m.group(1)
864                vis_node = {'name': vn, \
865                        'x': float(m.group(2)),\
866                        'y' : float(m.group(3)),\
867                    }
868                if vn in links.keys() or vn in lans.keys():
869                    vis_node['type'] = 'lan'
870                else:
871                    vis_node['type'] = 'node'
872                vis_nodes.append(vis_node)
873        rv = dot.wait()
874
875        os.remove(dotname)
876        if rv == 0 : return vis
877        else: return None
878
879    def get_access(self, tb, nodes, user, tbparam, master, export_project,
880            access_user):
881        """
882        Get access to testbed through fedd and set the parameters for that tb
883        """
884
885        # XXX This is needless complexity.  It must vanish.
886        translate_attr = {
887            'slavenodestartcmd': 'expstart',
888            'slaveconnectorstartcmd': 'gwstart',
889            'masternodestartcmd': 'mexpstart',
890            'masterconnectorstartcmd': 'mgwstart',
891            'connectorimage': 'gwimage',
892            'connectortype': 'gwtype',
893            'tunnelcfg': 'tun',
894            'tunnelinterface': 'tunnelinterface',
895            'smbshare': 'smbshare',
896        }
897
898        uri = self.tbmap.get(tb, None)
899        if not uri:
900            raise service_error(serice_error.server_config, 
901                    "Unknown testbed: %s" % tb)
902
903        # currently this lumps all users into one service access group
904        service_keys = [ a for u in user \
905                for a in u.get('access', []) \
906                    if a.has_key('sshPubkey')]
907
908        if len(service_keys) == 0:
909            raise service_error(service_error.req, 
910                    "Must have at least one SSH pubkey for services")
911
912
913        for p, u in access_user:
914            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
915                    "to %s") %  ((p or "None"), u, uri))
916
917            if p:
918                # Request with user and project specified
919                req = {\
920                        'destinationTestbed' : { 'uri' : uri },
921                        'project': { 
922                            'name': {'localname': p},
923                            'user': [ {'userID': { 'localname': u } } ],
924                            },
925                        'user':  user,
926                        'allocID' : { 'localname': 'test' },
927                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
928                        'serviceAccess' : service_keys
929                    }
930            else:
931                # Request with only user specified
932                req = {\
933                        'destinationTestbed' : { 'uri' : uri },
934                        'user':  [ {'userID': { 'localname': u } } ],
935                        'allocID' : { 'localname': 'test' },
936                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
937                        'serviceAccess' : service_keys
938                    }
939
940            if tb == master:
941                # NB, the export_project parameter is a dict that includes
942                # the type
943                req['exportProject'] = export_project
944
945            # node resources if any
946            if nodes != None and len(nodes) > 0:
947                rnodes = [ ]
948                for n in nodes:
949                    rn = { }
950                    image, hw, count = n.split(":")
951                    if image: rn['image'] = [ image ]
952                    if hw: rn['hardware'] = [ hw ]
953                    if count and int(count) >0 : rn['count'] = int(count)
954                    rnodes.append(rn)
955                req['resources']= { }
956                req['resources']['node'] = rnodes
957
958            try:
959                if self.local_access.has_key(uri):
960                    # Local access call
961                    req = { 'RequestAccessRequestBody' : req }
962                    r = self.local_access[uri].RequestAccess(req, 
963                            fedid(file=self.cert_file))
964                    r = { 'RequestAccessResponseBody' : r }
965                else:
966                    r = self.call_RequestAccess(uri, req, 
967                            self.cert_file, self.cert_pwd, self.trusted_certs)
968            except service_error, e:
969                if e.code == service_error.access:
970                    self.log.debug("[get_access] Access denied")
971                    r = None
972                    continue
973                else:
974                    raise e
975
976            if r.has_key('RequestAccessResponseBody'):
977                # Through to here we have a valid response, not a fault.
978                # Access denied is a fault, so something better or worse than
979                # access denied has happened.
980                r = r['RequestAccessResponseBody']
981                self.log.debug("[get_access] Access granted")
982                break
983            else:
984                raise service_error(service_error.protocol,
985                        "Bad proxy response")
986       
987        if not r:
988            raise service_error(service_error.access, 
989                    "Access denied by %s (%s)" % (tb, uri))
990
991        e = r['emulab']
992        p = e['project']
993        tbparam[tb] = { 
994                "boss": e['boss'],
995                "host": e['ops'],
996                "domain": e['domain'],
997                "fs": e['fileServer'],
998                "eventserver": e['eventServer'],
999                "project": unpack_id(p['name']),
1000                "emulab" : e,
1001                "allocID" : r['allocID'],
1002                }
1003        # Make the testbed name be the label the user applied
1004        p['testbed'] = {'localname': tb }
1005
1006        for u in p['user']:
1007            role = u.get('role', None)
1008            if role == 'experimentCreation':
1009                tbparam[tb]['user'] = unpack_id(u['userID'])
1010                break
1011        else:
1012            raise service_error(service_error.internal, 
1013                    "No createExperimentUser from %s" %tb)
1014
1015        for a in e['fedAttr']:
1016            if a['attribute']:
1017                key = translate_attr.get(a['attribute'].lower(), None)
1018                if key:
1019                    tbparam[tb][key]= a['value']
1020       
1021    def release_access(self, tb, aid):
1022        """
1023        Release access to testbed through fedd
1024        """
1025
1026        uri = self.tbmap.get(tb, None)
1027        if not uri:
1028            raise service_error(serice_error.server_config, 
1029                    "Unknown testbed: %s" % tb)
1030
1031        if self.local_access.has_key(uri):
1032            resp = self.local_access[uri].ReleaseAccess(\
1033                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1034                    fedid(file=self.cert_file))
1035            resp = { 'ReleaseAccessResponseBody': resp } 
1036        else:
1037            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1038                    self.cert_file, self.cert_pwd, self.trusted_certs)
1039
1040        # better error coding
1041
1042    def remote_splitter(self, uri, desc, master):
1043
1044        req = {
1045                'description' : { 'ns2description': desc },
1046                'master': master,
1047                'include_fedkit': bool(self.fedkit)
1048            }
1049
1050        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1051                self.trusted_certs)
1052
1053        if r.has_key('Ns2SplitResponseBody'):
1054            r = r['Ns2SplitResponseBody']
1055            if r.has_key('output'):
1056                return r['output'].splitlines()
1057            else:
1058                raise service_error(service_error.protocol, 
1059                        "Bad splitter response (no output)")
1060        else:
1061            raise service_error(service_error.protocol, "Bad splitter response")
1062       
1063    class current_testbed:
1064        """
1065        Object for collecting the current testbed description.  The testbed
1066        description is saved to a file with the local testbed variables
1067        subsittuted line by line.
1068        """
1069        def __init__(self, eid, tmpdir, fedkit):
1070            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1071            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1072            self.current_testbed = None
1073            self.testbed_file = None
1074
1075            self.def_expstart = \
1076                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1077            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1078            self.def_gwstart = \
1079                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1080            self.def_mgwstart = \
1081                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1082            self.def_gwimage = "FBSD61-TUNNEL2";
1083            self.def_gwtype = "pc";
1084
1085            self.eid = eid
1086            self.tmpdir = tmpdir
1087            self.fedkit = fedkit
1088
1089        def __call__(self, line, master, allocated, tbparams):
1090            # Capture testbed topology descriptions
1091            if self.current_testbed == None:
1092                m = self.begin_testbed.match(line)
1093                if m != None:
1094                    self.current_testbed = m.group(1)
1095                    if self.current_testbed == None:
1096                        raise service_error(service_error.req,
1097                                "Bad request format (unnamed testbed)")
1098                    allocated[self.current_testbed] = \
1099                            allocated.get(self.current_testbed,0) + 1
1100                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1101                    if not os.path.exists(tb_dir):
1102                        try:
1103                            os.mkdir(tb_dir)
1104                        except IOError:
1105                            raise service_error(service_error.internal,
1106                                    "Cannot create %s" % tb_dir)
1107                    try:
1108                        self.testbed_file = open("%s/%s.%s.tcl" %
1109                                (tb_dir, self.eid, self.current_testbed), 'w')
1110                    except IOError:
1111                        self.testbed_file = None
1112                    return True
1113                else: return False
1114            else:
1115                m = self.end_testbed.match(line)
1116                if m != None:
1117                    if m.group(1) != self.current_testbed:
1118                        raise service_error(service_error.internal, 
1119                                "Mismatched testbed markers!?")
1120                    if self.testbed_file != None: 
1121                        self.testbed_file.close()
1122                        self.testbed_file = None
1123                    self.current_testbed = None
1124                elif self.testbed_file:
1125                    # Substitute variables and put the line into the local
1126                    # testbed file.
1127                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1128                            self.def_gwtype)
1129                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1130                            self.def_gwimage)
1131                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1132                            self.def_mgwstart)
1133                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1134                            self.def_mexpstart)
1135                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1136                            self.def_gwstart)
1137                    expstart = tbparams[self.current_testbed].get('expstart', 
1138                            self.def_expstart)
1139                    project = tbparams[self.current_testbed].get('project')
1140                    line = re.sub("GWTYPE", gwtype, line)
1141                    line = re.sub("GWIMAGE", gwimage, line)
1142                    if self.current_testbed == master:
1143                        line = re.sub("GWSTART", mgwstart, line)
1144                        line = re.sub("EXPSTART", mexpstart, line)
1145                    else:
1146                        line = re.sub("GWSTART", gwstart, line)
1147                        line = re.sub("EXPSTART", expstart, line)
1148                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1149                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1150                    line = re.sub("EID", self.eid, line)
1151                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1152                            (project, self.eid), line)
1153                    if self.fedkit:
1154                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1155                                line)
1156                    print >>self.testbed_file, line
1157                return True
1158
1159    class allbeds:
1160        """
1161        Process the Allbeds section.  Get access to each federant and save the
1162        parameters in tbparams
1163        """
1164        def __init__(self, get_access):
1165            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1166            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1167            self.in_allbeds = False
1168            self.get_access = get_access
1169
1170        def __call__(self, line, user, tbparams, master, export_project,
1171                access_user):
1172            # Testbed access parameters
1173            if not self.in_allbeds:
1174                if self.begin_allbeds.match(line):
1175                    self.in_allbeds = True
1176                    return True
1177                else:
1178                    return False
1179            else:
1180                if self.end_allbeds.match(line):
1181                    self.in_allbeds = False
1182                else:
1183                    nodes = line.split('|')
1184                    tb = nodes.pop(0)
1185                    self.get_access(tb, nodes, user, tbparams, master,
1186                            export_project, access_user)
1187                return True
1188
1189    class gateways:
1190        def __init__(self, eid, master, tmpdir, gw_pubkey,
1191                gw_secretkey, copy_file, fedkit):
1192            self.begin_gateways = \
1193                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1194            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1195            self.current_gateways = None
1196            self.control_gateway = None
1197            self.active_end = { }
1198
1199            self.eid = eid
1200            self.master = master
1201            self.tmpdir = tmpdir
1202            self.gw_pubkey_base = gw_pubkey
1203            self.gw_secretkey_base = gw_secretkey
1204
1205            self.copy_file = copy_file
1206            self.fedkit = fedkit
1207
1208
1209        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1210                active_end, tbparams, dtb, myname, desthost, type):
1211            """
1212            Produce a gateway configuration file from a gateways line.
1213            """
1214
1215            sproject = tbparams[gw].get('project', 'project')
1216            dproject = tbparams[dtb].get('project', 'project')
1217            sdomain = ".%s.%s%s" % (eid, sproject,
1218                    tbparams[gw].get('domain', ".example.com"))
1219            ddomain = ".%s.%s%s" % (eid, dproject,
1220                    tbparams[dtb].get('domain', ".example.com"))
1221            boss = tbparams[master].get('boss', "boss")
1222            fs = tbparams[master].get('fs', "fs")
1223            event_server = "%s%s" % \
1224                    (tbparams[gw].get('eventserver', "event_server"),
1225                            tbparams[gw].get('domain', "example.com"))
1226            remote_event_server = "%s%s" % \
1227                    (tbparams[dtb].get('eventserver', "event_server"),
1228                            tbparams[dtb].get('domain', "example.com"))
1229            seer_control = "%s%s" % \
1230                    (tbparams[gw].get('control', "control"), sdomain)
1231            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1232
1233            if self.fedkit:
1234                remote_script_dir = "/usr/local/federation/bin"
1235                local_script_dir = "/usr/local/federation/bin"
1236            else:
1237                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1238                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1239
1240            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1241            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1242            tunnel_cfg = tbparams[gw].get("tun", "false")
1243
1244            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1245            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1246
1247            # translate to lower case so the `hostname` hack for specifying
1248            # configuration files works.
1249            conf_file = conf_file.lower();
1250            remote_conf_file = remote_conf_file.lower();
1251
1252            if dtb == master:
1253                active = "false"
1254            elif gw == master:
1255                active = "true"
1256            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1257                active = "false"
1258            else:
1259                active_end['%s-%s' % (gw, dtb)] = 1
1260                active = "true"
1261
1262            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1263            print >>gwconfig, "Active: %s" % active
1264            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1265            if tunnel_iface:
1266                print >>gwconfig, "Interface: %s" % tunnel_iface
1267            print >>gwconfig, "BossName: %s" % boss
1268            print >>gwconfig, "FsName: %s" % fs
1269            print >>gwconfig, "EventServerName: %s" % event_server
1270            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1271            print >>gwconfig, "SeerControl: %s" % seer_control
1272            print >>gwconfig, "Type: %s" % type
1273            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1274            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1275                    local_script_dir
1276            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1277            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1278            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1279                    (remote_conf_dir, remote_conf_file)
1280            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1281            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1282            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1283            gwconfig.close()
1284
1285            return active == "true"
1286
1287        def __call__(self, line, allocated, tbparams):
1288            # Process gateways
1289            if not self.current_gateways:
1290                m = self.begin_gateways.match(line)
1291                if m:
1292                    self.current_gateways = m.group(1)
1293                    if allocated.has_key(self.current_gateways):
1294                        # This test should always succeed
1295                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1296                        if not os.path.exists(tb_dir):
1297                            try:
1298                                os.mkdir(tb_dir)
1299                            except IOError:
1300                                raise service_error(service_error.internal,
1301                                        "Cannot create %s" % tb_dir)
1302                    else:
1303                        # XXX
1304                        self.log.error("[gateways]: Ignoring gateways for " + \
1305                                "unknown testbed %s" % self.current_gateways)
1306                        self.current_gateways = None
1307                    return True
1308                else:
1309                    return False
1310            else:
1311                m = self.end_gateways.match(line)
1312                if m :
1313                    if m.group(1) != self.current_gateways:
1314                        raise service_error(service_error.internal,
1315                                "Mismatched gateway markers!?")
1316                    if self.control_gateway:
1317                        try:
1318                            cc = open("%s/%s/client.conf" %
1319                                    (self.tmpdir, self.current_gateways), 'w')
1320                            print >>cc, "ControlGateway: %s" % \
1321                                    self.control_gateway
1322                            if tbparams[self.master].has_key('smbshare'):
1323                                print >>cc, "SMBSHare: %s" % \
1324                                        tbparams[self.master]['smbshare']
1325                            print >>cc, "ProjectUser: %s" % \
1326                                    tbparams[self.master]['user']
1327                            print >>cc, "ProjectName: %s" % \
1328                                    tbparams[self.master]['project']
1329                            cc.close()
1330                        except IOError:
1331                            raise service_error(service_error.internal,
1332                                    "Error creating client config")
1333                        try:
1334                            cc = open("%s/%s/seer.conf" %
1335                                    (self.tmpdir, self.current_gateways),
1336                                    'w')
1337                            if self.current_gateways != self.master:
1338                                print >>cc, "ControlNode: %s" % \
1339                                        self.control_gateway
1340                            print >>cc, "ExperimentID: %s/%s" % \
1341                                    ( tbparams[self.master]['project'], \
1342                                    self.eid )
1343                            cc.close()
1344                        except IOError:
1345                            raise service_error(service_error.internal,
1346                                    "Error creating seer config")
1347                    else:
1348                        debug.error("[gateways]: No control gateway for %s" %\
1349                                    self.current_gateways)
1350                    self.current_gateways = None
1351                else:
1352                    dtb, myname, desthost, type = line.split(" ")
1353
1354                    if type == "control" or type == "both":
1355                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1356                                self.eid, 
1357                                tbparams[self.current_gateways]['project'],
1358                                tbparams[self.current_gateways]['domain'])
1359                    try:
1360                        active = self.gateway_conf_file(self.current_gateways,
1361                                self.master, self.eid, self.gw_pubkey_base,
1362                                self.gw_secretkey_base,
1363                                self.active_end, tbparams, dtb, myname,
1364                                desthost, type)
1365                    except IOError, e:
1366                        raise service_error(service_error.internal,
1367                                "Failed to write config file for %s" % \
1368                                        self.current_gateway)
1369           
1370                    gw_pubkey = "%s/keys/%s" % \
1371                            (self.tmpdir, self.gw_pubkey_base)
1372                    gw_secretkey = "%s/keys/%s" % \
1373                            (self.tmpdir, self.gw_secretkey_base)
1374
1375                    pkfile = "%s/%s/%s" % \
1376                            ( self.tmpdir, self.current_gateways, 
1377                                    self.gw_pubkey_base)
1378                    skfile = "%s/%s/%s" % \
1379                            ( self.tmpdir, self.current_gateways, 
1380                                    self.gw_secretkey_base)
1381
1382                    if not os.path.exists(pkfile):
1383                        try:
1384                            self.copy_file(gw_pubkey, pkfile)
1385                        except IOError:
1386                            service_error(service_error.internal,
1387                                    "Failed to copy pubkey file")
1388
1389                    if active and not os.path.exists(skfile):
1390                        try:
1391                            self.copy_file(gw_secretkey, skfile)
1392                        except IOError:
1393                            service_error(service_error.internal,
1394                                    "Failed to copy secretkey file")
1395                return True
1396
1397    class shunt_to_file:
1398        """
1399        Simple class to write data between two regexps to a file.
1400        """
1401        def __init__(self, begin, end, filename):
1402            """
1403            Begin shunting on a match of begin, stop on end, send data to
1404            filename.
1405            """
1406            self.begin = re.compile(begin)
1407            self.end = re.compile(end)
1408            self.in_shunt = False
1409            self.file = None
1410            self.filename = filename
1411
1412        def __call__(self, line):
1413            """
1414            Call this on each line in the input that may be shunted.
1415            """
1416            if not self.in_shunt:
1417                if self.begin.match(line):
1418                    self.in_shunt = True
1419                    try:
1420                        self.file = open(self.filename, "w")
1421                    except:
1422                        self.file = None
1423                        raise
1424                    return True
1425                else:
1426                    return False
1427            else:
1428                if self.end.match(line):
1429                    if self.file: 
1430                        self.file.close()
1431                        self.file = None
1432                    self.in_shunt = False
1433                else:
1434                    if self.file:
1435                        print >>self.file, line
1436                return True
1437
1438    class shunt_to_list:
1439        """
1440        Same interface as shunt_to_file.  Data collected in self.list, one list
1441        element per line.
1442        """
1443        def __init__(self, begin, end):
1444            self.begin = re.compile(begin)
1445            self.end = re.compile(end)
1446            self.in_shunt = False
1447            self.list = [ ]
1448       
1449        def __call__(self, line):
1450            if not self.in_shunt:
1451                if self.begin.match(line):
1452                    self.in_shunt = True
1453                    return True
1454                else:
1455                    return False
1456            else:
1457                if self.end.match(line):
1458                    self.in_shunt = False
1459                else:
1460                    self.list.append(line)
1461                return True
1462
1463    class shunt_to_string:
1464        """
1465        Same interface as shunt_to_file.  Data collected in self.str, all in
1466        one string.
1467        """
1468        def __init__(self, begin, end):
1469            self.begin = re.compile(begin)
1470            self.end = re.compile(end)
1471            self.in_shunt = False
1472            self.str = ""
1473       
1474        def __call__(self, line):
1475            if not self.in_shunt:
1476                if self.begin.match(line):
1477                    self.in_shunt = True
1478                    return True
1479                else:
1480                    return False
1481            else:
1482                if self.end.match(line):
1483                    self.in_shunt = False
1484                else:
1485                    self.str += line
1486                return True
1487
1488    def create_experiment(self, req, fid):
1489        """
1490        The external interface to experiment creation called from the
1491        dispatcher.
1492
1493        Creates a working directory, splits the incoming description using the
1494        splitter script and parses out the avrious subsections using the
1495        lcasses above.  Once each sub-experiment is created, use pooled threads
1496        to instantiate them and start it all up.
1497        """
1498
1499        if not self.auth.check_attribute(fid, 'create'):
1500            raise service_error(service_error.access, "Create access denied")
1501
1502        try:
1503            tmpdir = tempfile.mkdtemp(prefix="split-")
1504        except IOError:
1505            raise service_error(service_error.internal, "Cannot create tmp dir")
1506
1507        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1508        gw_secretkey_base = "fed.%s" % self.ssh_type
1509        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1510        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1511        tclfile = tmpdir + "/experiment.tcl"
1512        tbparams = { }
1513        try:
1514            access_user = self.accessdb[fid]
1515        except KeyError:
1516            raise service_error(service_error.internal,
1517                    "Access map and authorizer out of sync in " + \
1518                            "create_experiment for fedid %s"  % fid)
1519
1520        pid = "dummy"
1521        gid = "dummy"
1522        # XXX
1523        fail_soft = False
1524
1525        try:
1526            os.mkdir(tmpdir+"/keys")
1527        except OSError:
1528            raise service_error(service_error.internal,
1529                    "Can't make temporary dir")
1530
1531        req = req.get('CreateRequestBody', None)
1532        if not req:
1533            raise service_error(service_error.req,
1534                    "Bad request format (no CreateRequestBody)")
1535        # The tcl parser needs to read a file so put the content into that file
1536        descr=req.get('experimentdescription', None)
1537        if descr:
1538            file_content=descr.get('ns2description', None)
1539            if file_content:
1540                try:
1541                    f = open(tclfile, 'w')
1542                    f.write(file_content)
1543                    f.close()
1544                except IOError:
1545                    raise service_error(service_error.internal,
1546                            "Cannot write temp experiment description")
1547            else:
1548                raise service_error(service_error.req, 
1549                        "Only ns2descriptions supported")
1550        else:
1551            raise service_error(service_error.req, "No experiment description")
1552
1553        if req.has_key('experimentID') and \
1554                req['experimentID'].has_key('localname'):
1555            eid = req['experimentID']['localname']
1556            self.state_lock.acquire()
1557            while (self.state.has_key(eid)):
1558                eid += random.choice(string.ascii_letters)
1559            # To avoid another thread picking this localname
1560            self.state[eid] = "placeholder"
1561            self.state_lock.release()
1562        else:
1563            eid = self.exp_stem
1564            for i in range(0,5):
1565                eid += random.choice(string.ascii_letters)
1566            self.state_lock.acquire()
1567            while (self.state.has_key(eid)):
1568                eid = self.exp_stem
1569                for i in range(0,5):
1570                    eid += random.choice(string.ascii_letters)
1571            # To avoid another thread picking this localname
1572            self.state[eid] = "placeholder"
1573            self.state_lock.release()
1574
1575        try: 
1576            # This catches exceptions to clear the placeholder if necessary
1577            try:
1578                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1579            except ValueError:
1580                raise service_error(service_error.server_config, 
1581                        "Bad key type (%s)" % self.ssh_type)
1582
1583            user = req.get('user', None)
1584            if user == None:
1585                raise service_error(service_error.req, "No user")
1586
1587            master = req.get('master', None)
1588            if not master:
1589                raise service_error(service_error.req,
1590                        "No master testbed label")
1591            export_project = req.get('exportProject', None)
1592            if not export_project:
1593                raise service_error(service_error.req, "No export project")
1594           
1595            if self.splitter_url:
1596                self.log.debug("Calling remote splitter at %s" % \
1597                        self.splitter_url)
1598                split_data = self.remote_splitter(self.splitter_url,
1599                        file_content, master)
1600            else:
1601                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1602                    str(self.muxmax), '-m', master]
1603
1604                if self.fedkit:
1605                    tclcmd.append('-k')
1606
1607                tclcmd.extend([pid, gid, eid, tclfile])
1608
1609                self.log.debug("running local splitter %s", " ".join(tclcmd))
1610                tclparser = Popen(tclcmd, stdout=PIPE)
1611                split_data = tclparser.stdout
1612
1613            allocated = { }         # Testbeds we can access
1614            started = { }           # Testbeds where a sub-experiment started
1615                                # successfully
1616
1617            # Objects to parse the splitter output (defined above)
1618            parse_current_testbed = self.current_testbed(eid, tmpdir,
1619                    self.fedkit)
1620            parse_allbeds = self.allbeds(self.get_access)
1621            parse_gateways = self.gateways(eid, master, tmpdir,
1622                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1623                    self.fedkit)
1624            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1625                        "^#\s+End\s+Vtopo")
1626            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1627                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1628            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1629                    "^#\s+End\s+tarfiles")
1630            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1631                    "^#\s+End\s+rpms")
1632
1633            # Working on the split data
1634            for line in split_data:
1635                line = line.rstrip()
1636                if parse_current_testbed(line, master, allocated, tbparams):
1637                    continue
1638                elif parse_allbeds(line, user, tbparams, master, export_project,
1639                        access_user):
1640                    continue
1641                elif parse_gateways(line, allocated, tbparams):
1642                    continue
1643                elif parse_vtopo(line):
1644                    continue
1645                elif parse_hostnames(line):
1646                    continue
1647                elif parse_tarfiles(line):
1648                    continue
1649                elif parse_rpms(line):
1650                    continue
1651                else:
1652                    raise service_error(service_error.internal, 
1653                            "Bad tcl parse? %s" % line)
1654            # Virtual topology and visualization
1655            vtopo = self.gentopo(parse_vtopo.str)
1656            if not vtopo:
1657                raise service_error(service_error.internal, 
1658                        "Failed to generate virtual topology")
1659
1660            vis = self.genviz(vtopo)
1661            if not vis:
1662                raise service_error(service_error.internal, 
1663                        "Failed to generate visualization")
1664           
1665            # save federant information
1666            for k in allocated.keys():
1667                tbparams[k]['federant'] = {\
1668                        'name': [ { 'localname' : eid} ],\
1669                        'emulab': tbparams[k]['emulab'],\
1670                        'allocID' : tbparams[k]['allocID'],\
1671                        'master' : k == master,\
1672                    }
1673
1674
1675            # Copy tarfiles and rpms needed at remote sites into a staging area
1676            try:
1677                if self.fedkit:
1678                    parse_tarfiles.list.append(self.fedkit)
1679                for t in parse_tarfiles.list:
1680                    if not os.path.exists("%s/tarfiles" % tmpdir):
1681                        os.mkdir("%s/tarfiles" % tmpdir)
1682                    self.copy_file(t, "%s/tarfiles/%s" % \
1683                            (tmpdir, os.path.basename(t)))
1684                for r in parse_rpms.list:
1685                    if not os.path.exists("%s/rpms" % tmpdir):
1686                        os.mkdir("%s/rpms" % tmpdir)
1687                    self.copy_file(r, "%s/rpms/%s" % \
1688                            (tmpdir, os.path.basename(r)))
1689            except IOError, e:
1690                raise service_error(service_error.internal, 
1691                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1692
1693        except service_error, e:
1694            # If something goes wrong in the parse (usually an access error)
1695            # clear the placeholder state.  From here on out the code delays
1696            # exceptions.
1697            self.state_lock.acquire()
1698            del self.state[eid]
1699            self.state_lock.release()
1700            raise e
1701
1702        thread_pool_info = self.thread_pool()
1703        threads = [ ]
1704
1705        for tb in [ k for k in allocated.keys() if k != master]:
1706            # Wait until we have a free slot to start the next testbed load
1707            thread_pool_info.acquire()
1708            while thread_pool_info.started - \
1709                    thread_pool_info.terminated >= self.nthreads:
1710                thread_pool_info.wait()
1711            thread_pool_info.release()
1712
1713            # Create and start a thread to start the segment, and save it to
1714            # get the return value later
1715            t  = self.pooled_thread(target=self.start_segment, 
1716                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1717                    pdata=thread_pool_info, trace_file=self.trace_file)
1718            threads.append(t)
1719            t.start()
1720
1721        # Wait until all finish (the first clause of the while is to make sure
1722        # one starts)
1723        thread_pool_info.acquire()
1724        while thread_pool_info.started == 0 or \
1725                thread_pool_info.started > thread_pool_info.terminated:
1726            thread_pool_info.wait()
1727        thread_pool_info.release()
1728
1729        # If none failed, start the master
1730        failed = [ t.getName() for t in threads if not t.rv ]
1731
1732        if len(failed) == 0:
1733            if not self.start_segment(master, eid, tbparams, tmpdir):
1734                failed.append(master)
1735
1736        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1737        # If one failed clean up, unless fail_soft is set
1738        if failed:
1739            if not fail_soft:
1740                for tb in succeeded:
1741                    self.stop_segment(tb, eid, tbparams)
1742                # release the allocations
1743                for tb in tbparams.keys():
1744                    self.release_access(tb, tbparams[tb]['allocID'])
1745                # Remove the placeholder
1746                self.state_lock.acquire()
1747                del self.state[eid]
1748                self.state_lock.release()
1749
1750                raise service_error(service_error.federant,
1751                    "Swap in failed on %s" % ",".join(failed))
1752        else:
1753            self.log.info("[start_segment]: Experiment %s started" % eid)
1754
1755        # Generate an ID for the experiment (slice) and a certificate that the
1756        # allocator can use to prove they own it.  We'll ship it back through
1757        # the encrypted connection.
1758        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1759
1760        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1761
1762        # Walk up tmpdir, deleting as we go
1763        for path, dirs, files in os.walk(tmpdir, topdown=False):
1764            for f in files:
1765                os.remove(os.path.join(path, f))
1766            for d in dirs:
1767                os.rmdir(os.path.join(path, d))
1768        os.rmdir(tmpdir)
1769
1770        # The deepcopy prevents the allocation ID and other binaries from being
1771        # translated into other formats
1772        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1773                for tb in tbparams.keys() \
1774                    if tbparams[tb].has_key('federant') ],\
1775                    'vtopo': vtopo,\
1776                    'vis' : vis,
1777                    'experimentID' : [\
1778                            { 'fedid': copy.copy(expid) }, \
1779                            { 'localname': eid },\
1780                        ],\
1781                    'experimentAccess': { 'X509' : expcert },\
1782                }
1783        # remove the allocationID info from each federant
1784        for f in resp['federant']:
1785            if f.has_key('allocID'): del f['allocID']
1786
1787        # Insert the experiment into our state and update the disk copy
1788        self.state_lock.acquire()
1789        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1790                for tb in tbparams.keys() \
1791                    if tbparams[tb].has_key('federant') ],\
1792                    'vtopo': vtopo,\
1793                    'vis' : vis,
1794                    'owner': fid,
1795                    'experimentID' : [\
1796                            { 'fedid': expid }, { 'localname': eid },\
1797                        ],\
1798                }
1799        self.state[eid] = self.state[expid]
1800        if self.state_filename: self.write_state()
1801        self.state_lock.release()
1802
1803        self.auth.set_attribute(fid, expid)
1804        self.auth.set_attribute(expid, expid)
1805
1806        if not failed:
1807            return resp
1808        else:
1809            raise service_error(service_error.partial, \
1810                    "Partial swap in on %s" % ",".join(succeeded))
1811
1812    def check_experiment_access(self, fid, key):
1813        """
1814        Confirm that the fid has access to the experiment.  Though a request
1815        may be made in terms of a local name, the access attribute is always
1816        the experiment's fedid.
1817        """
1818        if not isinstance(key, fedid):
1819            self.state_lock.acquire()
1820            if self.state.has_key(key):
1821                try:
1822                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1823                            if f.has_key('fedid') ]
1824                except KeyError:
1825                    self.state_lock.release()
1826                    raise service_error(service_error.internal, 
1827                            "No fedid for experiment %s when checking " +\
1828                                    "access(!?)" % key)
1829                if len(kl) == 1:
1830                    key = kl[0]
1831                else:
1832                    self.state_lock.release()
1833                    raise service_error(service_error.internal, 
1834                            "multiple fedids for experiment %s when " +\
1835                                    "checking access(!?)" % key)
1836            else:
1837                self.state_lock.release()
1838                raise service_error(service_error.access, "Access Denied")
1839            self.state_lock.release()
1840
1841        if self.auth.check_attribute(fid, key):
1842            return True
1843        else:
1844            raise service_error(service_error.access, "Access Denied")
1845
1846
1847
1848    def get_vtopo(self, req, fid):
1849        """
1850        Return the stored virtual topology for this experiment
1851        """
1852        rv = None
1853
1854        req = req.get('VtopoRequestBody', None)
1855        if not req:
1856            raise service_error(service_error.req,
1857                    "Bad request format (no VtopoRequestBody)")
1858        exp = req.get('experiment', None)
1859        if exp:
1860            if exp.has_key('fedid'):
1861                key = exp['fedid']
1862                keytype = "fedid"
1863            elif exp.has_key('localname'):
1864                key = exp['localname']
1865                keytype = "localname"
1866            else:
1867                raise service_error(service_error.req, "Unknown lookup type")
1868        else:
1869            raise service_error(service_error.req, "No request?")
1870
1871        self.check_experiment_access(fid, key)
1872
1873        self.state_lock.acquire()
1874        if self.state.has_key(key):
1875            rv = { 'experiment' : {keytype: key },\
1876                    'vtopo': self.state[key]['vtopo'],\
1877                }
1878        self.state_lock.release()
1879
1880        if rv: return rv
1881        else: raise service_error(service_error.req, "No such experiment")
1882
1883    def get_vis(self, req, fid):
1884        """
1885        Return the stored visualization for this experiment
1886        """
1887        rv = None
1888
1889        req = req.get('VisRequestBody', None)
1890        if not req:
1891            raise service_error(service_error.req,
1892                    "Bad request format (no VisRequestBody)")
1893        exp = req.get('experiment', None)
1894        if exp:
1895            if exp.has_key('fedid'):
1896                key = exp['fedid']
1897                keytype = "fedid"
1898            elif exp.has_key('localname'):
1899                key = exp['localname']
1900                keytype = "localname"
1901            else:
1902                raise service_error(service_error.req, "Unknown lookup type")
1903        else:
1904            raise service_error(service_error.req, "No request?")
1905
1906        self.check_experiment_access(fid, key)
1907
1908        self.state_lock.acquire()
1909        if self.state.has_key(key):
1910            rv =  { 'experiment' : {keytype: key },\
1911                    'vis': self.state[key]['vis'],\
1912                    }
1913        self.state_lock.release()
1914
1915        if rv: return rv
1916        else: raise service_error(service_error.req, "No such experiment")
1917
1918    def get_info(self, req, fid):
1919        """
1920        Return all the stored info about this experiment
1921        """
1922        rv = None
1923
1924        req = req.get('InfoRequestBody', None)
1925        if not req:
1926            raise service_error(service_error.req,
1927                    "Bad request format (no VisRequestBody)")
1928        exp = req.get('experiment', None)
1929        if exp:
1930            if exp.has_key('fedid'):
1931                key = exp['fedid']
1932                keytype = "fedid"
1933            elif exp.has_key('localname'):
1934                key = exp['localname']
1935                keytype = "localname"
1936            else:
1937                raise service_error(service_error.req, "Unknown lookup type")
1938        else:
1939            raise service_error(service_error.req, "No request?")
1940
1941        self.check_experiment_access(fid, key)
1942
1943        # The state may be massaged by the service function that called
1944        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1945        # state.
1946        self.state_lock.acquire()
1947        if self.state.has_key(key):
1948            rv = copy.deepcopy(self.state[key])
1949        self.state_lock.release()
1950        # Remove the owner info
1951        del rv['owner']
1952        # remove the allocationID info from each federant
1953        for f in rv['federant']:
1954            if f.has_key('allocID'): del f['allocID']
1955
1956        if rv: return rv
1957        else: raise service_error(service_error.req, "No such experiment")
1958
1959
1960    def terminate_experiment(self, req, fid):
1961        """
1962        Swap this experiment out on the federants and delete the shared
1963        information
1964        """
1965        tbparams = { }
1966        req = req.get('TerminateRequestBody', None)
1967        if not req:
1968            raise service_error(service_error.req,
1969                    "Bad request format (no TerminateRequestBody)")
1970        exp = req.get('experiment', None)
1971        if exp:
1972            if exp.has_key('fedid'):
1973                key = exp['fedid']
1974                keytype = "fedid"
1975            elif exp.has_key('localname'):
1976                key = exp['localname']
1977                keytype = "localname"
1978            else:
1979                raise service_error(service_error.req, "Unknown lookup type")
1980        else:
1981            raise service_error(service_error.req, "No request?")
1982
1983        self.check_experiment_access(fid, key)
1984
1985        self.state_lock.acquire()
1986        fed_exp = self.state.get(key, None)
1987
1988        if fed_exp:
1989            # This branch of the conditional holds the lock to generate a
1990            # consistent temporary tbparams variable to deallocate experiments.
1991            # It releases the lock to do the deallocations and reacquires it to
1992            # remove the experiment state when the termination is complete.
1993            ids = []
1994            #  experimentID is a list of dicts that are self-describing
1995            #  identifiers.  This finds all the fedids and localnames - the
1996            #  keys of self.state - and puts them into ids.
1997            for id in fed_exp.get('experimentID', []):
1998                if id.has_key('fedid'): ids.append(id['fedid'])
1999                if id.has_key('localname'): ids.append(id['localname'])
2000
2001            # Construct enough of the tbparams to make the stop_segment calls
2002            # work
2003            for fed in fed_exp['federant']:
2004                try:
2005                    for e in fed['name']:
2006                        eid = e.get('localname', None)
2007                        if eid: break
2008                    else:
2009                        continue
2010
2011                    p = fed['emulab']['project']
2012
2013                    project = p['name']['localname']
2014                    tb = p['testbed']['localname']
2015                    user = p['user'][0]['userID']['localname']
2016
2017                    domain = fed['emulab']['domain']
2018                    host  = fed['emulab']['ops']
2019                    aid = fed['allocID']
2020                except KeyError, e:
2021                    continue
2022                tbparams[tb] = {\
2023                        'user': user,\
2024                        'domain': domain,\
2025                        'project': project,\
2026                        'host': host,\
2027                        'eid': eid,\
2028                        'aid': aid,\
2029                    }
2030            self.state_lock.release()
2031
2032            # Stop everyone.
2033            for tb in tbparams.keys():
2034                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
2035
2036            # release the allocations
2037            for tb in tbparams.keys():
2038                self.release_access(tb, tbparams[tb]['aid'])
2039
2040            # Remove the terminated experiment
2041            self.state_lock.acquire()
2042            for id in ids:
2043                if self.state.has_key(id): del self.state[id]
2044
2045            if self.state_filename: self.write_state()
2046            self.state_lock.release()
2047
2048            return { 'experiment': exp }
2049        else:
2050            # Don't forget to release the lock
2051            self.state_lock.release()
2052            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.