source: fedd/federation/experiment_control.py @ 79b6596

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 79b6596 was 79b6596, checked in by Ted Faber <faber@…>, 15 years ago

Initial swing at timeouts.

Also, the filesystem config script needs to take the ship_configs commands into
account.

  • Property mode set to 100644
File size: 84.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44   
45    class thread_pool:
46        """
47        A class to keep track of a set of threads all invoked for the same
48        task.  Manages the mutual exclusion of the states.
49        """
50        def __init__(self, nthreads):
51            """
52            Start a pool.
53            """
54            self.changed = Condition()
55            self.started = 0
56            self.terminated = 0
57            self.nthreads = nthreads
58
59        def acquire(self):
60            """
61            Get the pool's lock.
62            """
63            self.changed.acquire()
64
65        def release(self):
66            """
67            Release the pool's lock.
68            """
69            self.changed.release()
70
71        def wait(self, timeout = None):
72            """
73            Wait for a pool thread to start or stop.
74            """
75            self.changed.wait(timeout)
76
77        def start(self):
78            """
79            Called by a pool thread to report starting.
80            """
81            self.changed.acquire()
82            self.started += 1
83            self.changed.notifyAll()
84            self.changed.release()
85
86        def terminate(self):
87            """
88            Called by a pool thread to report finishing.
89            """
90            self.changed.acquire()
91            self.terminated += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def clear(self):
96            """
97            Clear all pool data.
98            """
99            self.changed.acquire()
100            self.started = 0
101            self.terminated =0
102            self.changed.notifyAll()
103            self.changed.release()
104
105        def wait_for_slot(self):
106            """
107            Wait until we have a free slot to start another pooled thread
108            """
109            self.acquire()
110            while self.started - self.terminated >= self.nthreads:
111                self.wait()
112            self.release()
113
114        def wait_for_all_done(self):
115            """
116            Wait until all active threads finish (and at least one has started)
117            """
118            self.acquire()
119            while self.started == 0 or self.started > self.terminated:
120                self.wait()
121            self.release()
122
123    class pooled_thread(Thread):
124        """
125        One of a set of threads dedicated to a specific task.  Uses the
126        thread_pool class above for coordination.
127        """
128        def __init__(self, group=None, target=None, name=None, args=(), 
129                kwargs={}, pdata=None, trace_file=None):
130            Thread.__init__(self, group, target, name, args, kwargs)
131            self.rv = None          # Return value of the ops in this thread
132            self.exception = None   # Exception that terminated this thread
133            self.target=target      # Target function to run on start()
134            self.args = args        # Args to pass to target
135            self.kwargs = kwargs    # Additional kw args
136            self.pdata = pdata      # thread_pool for this class
137            # Logger for this thread
138            self.log = logging.getLogger("fedd.experiment_control")
139       
140        def run(self):
141            """
142            Emulate Thread.run, except add pool data manipulation and error
143            logging.
144            """
145            if self.pdata:
146                self.pdata.start()
147
148            if self.target:
149                try:
150                    self.rv = self.target(*self.args, **self.kwargs)
151                except service_error, s:
152                    self.exception = s
153                    self.log.error("Thread exception: %s %s" % \
154                            (s.code_string(), s.desc))
155                except:
156                    self.exception = sys.exc_info()[1]
157                    self.log.error(("Unexpected thread exception: %s" +\
158                            "Trace %s") % (self.exception,\
159                                traceback.format_exc()))
160            if self.pdata:
161                self.pdata.terminate()
162
163    call_RequestAccess = service_caller('RequestAccess')
164    call_ReleaseAccess = service_caller('ReleaseAccess')
165    call_Ns2Split = service_caller('Ns2Split')
166
167    def __init__(self, config=None, auth=None):
168        """
169        Intialize the various attributes, most from the config object
170        """
171
172        def parse_tarfile_list(tf):
173            """
174            Parse a tarfile list from the configuration.  This is a set of
175            paths and tarfiles separated by spaces.
176            """
177            rv = [ ]
178            if tf is not None:
179                tl = tf.split()
180                while len(tl) > 1:
181                    p, t = tl[0:2]
182                    del tl[0:2]
183                    rv.append((p, t))
184            return rv
185
186        self.thread_with_rv = experiment_control_local.pooled_thread
187        self.thread_pool = experiment_control_local.thread_pool
188
189        self.cert_file = config.get("experiment_control", "cert_file")
190        if self.cert_file:
191            self.cert_pwd = config.get("experiment_control", "cert_pwd")
192        else:
193            self.cert_file = config.get("globals", "cert_file")
194            self.cert_pwd = config.get("globals", "cert_pwd")
195
196        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
197                or config.get("globals", "trusted_certs")
198
199        self.exp_stem = "fed-stem"
200        self.log = logging.getLogger("fedd.experiment_control")
201        set_log_level(config, "experiment_control", self.log)
202        self.muxmax = 2
203        self.nthreads = 2
204        self.randomize_experiments = False
205
206        self.scp_exec = "/usr/bin/scp"
207        self.splitter = None
208        self.ssh_exec="/usr/bin/ssh"
209        self.ssh_keygen = "/usr/bin/ssh-keygen"
210        self.ssh_identity_file = None
211
212
213        self.debug = config.getboolean("experiment_control", "create_debug")
214        self.state_filename = config.get("experiment_control", 
215                "experiment_state")
216        self.splitter_url = config.get("experiment_control", "splitter_uri")
217        self.fedkit = parse_tarfile_list(\
218                config.get("experiment_control", "fedkit"))
219        self.gatewaykit = parse_tarfile_list(\
220                config.get("experiment_control", "gatewaykit"))
221        accessdb_file = config.get("experiment_control", "accessdb")
222
223        self.ssh_pubkey_file = config.get("experiment_control", 
224                "ssh_pubkey_file")
225        self.ssh_privkey_file = config.get("experiment_control",
226                "ssh_privkey_file")
227        # NB for internal master/slave ops, not experiment setup
228        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
229        self.state = { }
230        self.state_lock = Lock()
231        self.tclsh = "/usr/local/bin/otclsh"
232        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
233                config.get("experiment_control", "tcl_splitter",
234                        "/usr/testbed/lib/ns2ir/parse.tcl")
235        mapdb_file = config.get("experiment_control", "mapdb")
236        self.trace_file = sys.stderr
237
238        self.def_expstart = \
239                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
240                "/tmp/federate";
241        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
242                "FEDDIR/hosts";
243        self.def_gwstart = \
244                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
245                "/tmp/bridge.log";
246        self.def_mgwstart = \
247                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
248                "/tmp/bridge.log";
249        self.def_gwimage = "FBSD61-TUNNEL2";
250        self.def_gwtype = "pc";
251        self.local_access = { }
252
253        if auth:
254            self.auth = auth
255        else:
256            self.log.error(\
257                    "[access]: No authorizer initialized, creating local one.")
258            auth = authorizer()
259
260
261        if self.ssh_pubkey_file:
262            try:
263                f = open(self.ssh_pubkey_file, 'r')
264                self.ssh_pubkey = f.read()
265                f.close()
266            except IOError:
267                raise service_error(service_error.internal,
268                        "Cannot read sshpubkey")
269        else:
270            raise service_error(service_error.internal, 
271                    "No SSH public key file?")
272
273        if not self.ssh_privkey_file:
274            raise service_error(service_error.internal, 
275                    "No SSH public key file?")
276
277
278        if mapdb_file:
279            self.read_mapdb(mapdb_file)
280        else:
281            self.log.warn("[experiment_control] No testbed map, using defaults")
282            self.tbmap = { 
283                    'deter':'https://users.isi.deterlab.net:23235',
284                    'emulab':'https://users.isi.deterlab.net:23236',
285                    'ucb':'https://users.isi.deterlab.net:23237',
286                    }
287
288        if accessdb_file:
289                self.read_accessdb(accessdb_file)
290        else:
291            raise service_error(service_error.internal,
292                    "No accessdb specified in config")
293
294        # Grab saved state.  OK to do this w/o locking because it's read only
295        # and only one thread should be in existence that can see self.state at
296        # this point.
297        if self.state_filename:
298            self.read_state()
299
300        # Dispatch tables
301        self.soap_services = {\
302                'Create': soap_handler('Create', self.create_experiment),
303                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
304                'Vis': soap_handler('Vis', self.get_vis),
305                'Info': soap_handler('Info', self.get_info),
306                'Terminate': soap_handler('Terminate', 
307                    self.terminate_experiment),
308        }
309
310        self.xmlrpc_services = {\
311                'Create': xmlrpc_handler('Create', self.create_experiment),
312                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
313                'Vis': xmlrpc_handler('Vis', self.get_vis),
314                'Info': xmlrpc_handler('Info', self.get_info),
315                'Terminate': xmlrpc_handler('Terminate',
316                    self.terminate_experiment),
317        }
318
319    def copy_file(self, src, dest, size=1024):
320        """
321        Exceedingly simple file copy.
322        """
323        s = open(src,'r')
324        d = open(dest, 'w')
325
326        buf = "x"
327        while buf != "":
328            buf = s.read(size)
329            d.write(buf)
330        s.close()
331        d.close()
332
333    # Call while holding self.state_lock
334    def write_state(self):
335        """
336        Write a new copy of experiment state after copying the existing state
337        to a backup.
338
339        State format is a simple pickling of the state dictionary.
340        """
341        if os.access(self.state_filename, os.W_OK):
342            self.copy_file(self.state_filename, \
343                    "%s.bak" % self.state_filename)
344        try:
345            f = open(self.state_filename, 'w')
346            pickle.dump(self.state, f)
347        except IOError, e:
348            self.log.error("Can't write file %s: %s" % \
349                    (self.state_filename, e))
350        except pickle.PicklingError, e:
351            self.log.error("Pickling problem: %s" % e)
352        except TypeError, e:
353            self.log.error("Pickling problem (TypeError): %s" % e)
354
355    # Call while holding self.state_lock
356    def read_state(self):
357        """
358        Read a new copy of experiment state.  Old state is overwritten.
359
360        State format is a simple pickling of the state dictionary.
361        """
362        try:
363            f = open(self.state_filename, "r")
364            self.state = pickle.load(f)
365            self.log.debug("[read_state]: Read state from %s" % \
366                    self.state_filename)
367        except IOError, e:
368            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
369                    % (self.state_filename, e))
370        except pickle.UnpicklingError, e:
371            self.log.warning(("[read_state]: No saved state: " + \
372                    "Unpickling failed: %s") % e)
373       
374        for k in self.state.keys():
375            try:
376                # This list should only have one element in it, but phrasing it
377                # as a for loop doesn't cost much, really.  We have to find the
378                # fedid elements anyway.
379                for eid in [ f['fedid'] \
380                        for f in self.state[k]['experimentID']\
381                            if f.has_key('fedid') ]:
382                    self.auth.set_attribute(self.state[k]['owner'], eid)
383            except KeyError, e:
384                self.log.warning("[read_state]: State ownership or identity " +\
385                        "misformatted in %s: %s" % (self.state_filename, e))
386
387
388    def read_accessdb(self, accessdb_file):
389        """
390        Read the mapping from fedids that can create experiments to their name
391        in the 3-level access namespace.  All will be asserted from this
392        testbed and can include the local username and porject that will be
393        asserted on their behalf by this fedd.  Each fedid is also added to the
394        authorization system with the "create" attribute.
395        """
396        self.accessdb = {}
397        # These are the regexps for parsing the db
398        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
399        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
400                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
401        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
402                "\s*->\s*(" + name_expr + ")\s*$")
403        lineno = 0
404
405        # Parse the mappings and store in self.authdb, a dict of
406        # fedid -> (proj, user)
407        try:
408            f = open(accessdb_file, "r")
409            for line in f:
410                lineno += 1
411                line = line.strip()
412                if len(line) == 0 or line.startswith('#'):
413                    continue
414                m = project_line.match(line)
415                if m:
416                    fid = fedid(hexstr=m.group(1))
417                    project, user = m.group(2,3)
418                    if not self.accessdb.has_key(fid):
419                        self.accessdb[fid] = []
420                    self.accessdb[fid].append((project, user))
421                    continue
422
423                m = user_line.match(line)
424                if m:
425                    fid = fedid(hexstr=m.group(1))
426                    project = None
427                    user = m.group(2)
428                    if not self.accessdb.has_key(fid):
429                        self.accessdb[fid] = []
430                    self.accessdb[fid].append((project, user))
431                    continue
432                self.log.warn("[experiment_control] Error parsing access " +\
433                        "db %s at line %d" %  (accessdb_file, lineno))
434        except IOError:
435            raise service_error(service_error.internal,
436                    "Error opening/reading %s as experiment " +\
437                            "control accessdb" %  accessdb_file)
438        f.close()
439
440        # Initialize the authorization attributes
441        for fid in self.accessdb.keys():
442            self.auth.set_attribute(fid, 'create')
443
444    def read_mapdb(self, file):
445        """
446        Read a simple colon separated list of mappings for the
447        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
448        """
449
450        self.tbmap = { }
451        lineno =0
452        try:
453            f = open(file, "r")
454            for line in f:
455                lineno += 1
456                line = line.strip()
457                if line.startswith('#') or len(line) == 0:
458                    continue
459                try:
460                    label, url = line.split(':', 1)
461                    self.tbmap[label] = url
462                except ValueError, e:
463                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
464                            "map db: %s %s" % (lineno, line, e))
465        except IOError, e:
466            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
467                    "open %s: %s" % (file, e))
468        f.close()
469
470    def scp_file(self, file, user, host, dest=""):
471        """
472        scp a file to the remote host.  If debug is set the action is only
473        logged.
474        """
475
476        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
477                '-o', 'StrictHostKeyChecking yes', '-i', 
478                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
479        rv = 0
480
481        try:
482            dnull = open("/dev/null", "w")
483        except IOError:
484            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
485            dnull = Null
486
487        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
488        if not self.debug:
489            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
490
491        return rv == 0
492
493    def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
494        """
495        Run a remote command on host as user.  If debug is set, the action is
496        only logged.
497        """
498        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
499                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
500                (self.ssh_exec, self.ssh_privkey_file, 
501                        user, host, cmd)
502
503        try:
504            dnull = open("/dev/null", "r")
505        except IOError:
506            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
507            dnull = Null
508
509        self.log.debug("[ssh_cmd]: %s" % sh_str)
510        if not self.debug:
511            if dnull:
512                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
513            else:
514                sub = Popen(sh_str, shell=True)
515            if timeout:
516                i = 0
517                rv = sub.poll()
518                while i < timeout:
519                    if rv is not None: break
520                    else:
521                        time.sleep(1)
522                        rv = sub.poll()
523                        i += 1
524                else:
525                    self.log.debug("Process exceeded runtime: %s" % sh_str)
526                    os.kill(sub.pid, signal.SIGKILL)
527                    raise self.ssh_cmd_timeout();
528                return rv == 0
529            else:
530                return sub.wait() == 0
531        else:
532            return True
533
534    def ship_configs(self, host, user, src_dir, dest_dir):
535        """
536        Copy federant-specific configuration files to the federant.
537        """
538        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
539            return False
540        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
541            return False
542
543        for f in os.listdir(src_dir):
544            if os.path.isdir(f):
545                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
546                        "%s/%s" % (dest_dir, f)):
547                    return False
548            else:
549                if not self.scp_file("%s/%s" % (src_dir, f), 
550                        user, host, dest_dir):
551                    return False
552        return True
553
554    def get_state(self, user, host, ssh_key, tb, pid, eid):
555        # command to test experiment state
556        expinfo_exec = "/usr/testbed/bin/expinfo" 
557        # Regular expressions to parse the expinfo response
558        state_re = re.compile("State:\s+(\w+)")
559        no_exp_re = re.compile("^No\s+such\s+experiment")
560        swapping_re = re.compile("^No\s+information\s+available.")
561        state = None    # Experiment state parsed from expinfo
562        # The expinfo ssh command.  Note the identity restriction to use only
563        # the identity provided in the pubkey given.
564        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
565                'StrictHostKeyChecking yes', '-i', 
566                ssh_key, "%s@%s" % (user, host), 
567                expinfo_exec, pid, eid]
568
569        dev_null = None
570        try:
571            dev_null = open("/dev/null", "a")
572        except IOError, e:
573            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
574
575        if self.debug:
576            state = 'swapped'
577            rv = 0
578        else:
579            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
580            for line in status.stdout:
581                m = state_re.match(line)
582                if m: state = m.group(1)
583                else:
584                    for reg, st in ((no_exp_re, "none"),
585                            (swapping_re, "swapping")):
586                        m = reg.match(line)
587                        if m: state = st
588            rv = status.wait()
589
590        # If the experiment is not present the subcommand returns a non-zero
591        # return value.  If we successfully parsed a "none" outcome, ignore the
592        # return code.
593        if rv != 0 and state != 'none':
594            raise service_error(service_error.internal,
595                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
596        elif state not in ('active', 'swapped', 'swapping', 'none'):
597            raise service_error(service_error.internal,
598                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
599        else: return state
600
601
602    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
603        """
604        Start a sub-experiment on a federant.
605
606        Get the current state, modify or create as appropriate, ship data and
607        configs and start the experiment.  There are small ordering differences
608        based on the initial state of the sub-experiment.
609        """
610        # ops node in the federant
611        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
612        user = tbparams[tb]['user']     # federant user
613        pid = tbparams[tb]['project']   # federant project
614        # XXX
615        base_confs = ( "hosts",)
616        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
617        # Configuration directories on the remote machine
618        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
619        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
620        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
621
622        state = self.get_state(user, host, self.ssh_privkey_file, tb, pid, eid)
623
624        self.log.debug("[start_segment]: %s: %s" % (tb, state))
625        self.log.info("[start_segment]:transferring experiment to %s" % tb)
626
627        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
628            return False
629       
630        if state == 'none':
631            # Create a null copy of the experiment so that we capture any logs
632            # there if the modify fails.  Emulab software discards the logs
633            # from a failed startexp
634            if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
635                return False
636            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
637            timedout = False
638            try:
639                if not self.ssh_cmd(user, host,
640                        ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
641                        "-e %s null.tcl") % (pid, eid), "startexp",
642                        timeout=60 * 10):
643                    return False
644            except self.ssh_cmd_timeout:
645                timedout = True
646
647            if timedout:
648                state = self.get_state(user, host, self.ssh_privkey_file, 
649                        tb, eid, pid)
650                if state != "swapped":
651                    return False
652
653       
654        # Open up a temporary file to contain a script for setting up the
655        # filespace for the new experiment.
656        self.log.info("[start_segment]: creating script file")
657        try:
658            sf, scriptname = tempfile.mkstemp()
659            scriptfile = os.fdopen(sf, 'w')
660        except IOError:
661            return False
662
663        scriptbase = os.path.basename(scriptname)
664
665        # Script the filesystem changes
666        print >>scriptfile, "/bin/rm -rf %s" % proj_dir
667        # Clear and create the tarfiles and rpm directories
668        for d in (tarfiles_dir, rpms_dir):
669            print >>scriptfile, "/bin/rm -rf %s/*" % d
670            print >>scriptfile, "mkdir -p %s" % d
671        print >>scriptfile, 'mkdir -p %s' % proj_dir
672        print >>scriptfile, "rm -f %s" % scriptbase
673        scriptfile.close()
674
675        # Move the script to the remote machine
676        # XXX: could collide tempfile names on the remote host
677        if self.scp_file(scriptname, user, host, scriptbase):
678            os.remove(scriptname)
679        else:
680            return False
681
682        # Execute the script (and the script's last line deletes it)
683        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
684            return False
685
686        for f in base_confs:
687            if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
688                    "%s/%s" % (proj_dir, f)):
689                return False
690        if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
691                proj_dir):
692            return False
693        if os.path.isdir("%s/tarfiles" % tmpdir):
694            if not self.ship_configs(host, user,
695                    "%s/tarfiles" % tmpdir, tarfiles_dir):
696                return False
697        if os.path.isdir("%s/rpms" % tmpdir):
698            if not self.ship_configs(host, user,
699                    "%s/rpms" % tmpdir, tarfiles_dir):
700                return False
701        # Stage the new configuration (active experiments will stay swapped in
702        # now)
703        self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
704        try:
705            if not self.ssh_cmd(user, host,
706                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
707                            (pid, eid, tclfile),
708                    "modexp", timeout= 60 * 10):
709                return False
710        except self.ssh_cmd_timeout:
711            print "modexp timeout"
712            # There's really no way to see if this succeeded or failed, so if
713            # it hangs, assume the worst.
714            return False
715        # Active experiments are still swapped, this swaps the others in.
716        if state != 'active':
717            self.log.info("[start_segment]: Swapping %s in on %s" % \
718                    (eid, tb))
719            timedout = False
720            try:
721                if not self.ssh_cmd(user, host,
722                        "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
723                        "swapexp", timeout=10*60):
724                    return False
725            except self.ssh_cmd_timeout:
726                timedout = True
727           
728            # If the command was terminated, but completed successfully, report
729            # success.
730            if timedout:
731                state = self.get_state(user, host, self.ssh_privkey_file,
732                        tb, eid, pid)
733                self.log.debug("[start_segment]: swapin timed out (state)")
734                return state == 'active'
735        # Everything has gone OK.
736        return True
737
738    def stop_segment(self, tb, eid, tbparams):
739        """
740        Stop a sub experiment by calling swapexp on the federant
741        """
742        user = tbparams[tb]['user']
743        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
744        pid = tbparams[tb]['project']
745
746        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
747        return self.ssh_cmd(user, host,
748                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
749
750       
751    def generate_ssh_keys(self, dest, type="rsa" ):
752        """
753        Generate a set of keys for the gateways to use to talk.
754
755        Keys are of type type and are stored in the required dest file.
756        """
757        valid_types = ("rsa", "dsa")
758        t = type.lower();
759        if t not in valid_types: raise ValueError
760        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
761
762        try:
763            trace = open("/dev/null", "w")
764        except IOError:
765            raise service_error(service_error.internal,
766                    "Cannot open /dev/null??");
767
768        # May raise CalledProcessError
769        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
770        rv = call(cmd, stdout=trace, stderr=trace)
771        if rv != 0:
772            raise service_error(service_error.internal, 
773                    "Cannot generate nonce ssh keys.  %s return code %d" \
774                            % (self.ssh_keygen, rv))
775
776    def gentopo(self, str):
777        """
778        Generate the topology dtat structure from the splitter's XML
779        representation of it.
780
781        The topology XML looks like:
782            <experiment>
783                <nodes>
784                    <node><vname></vname><ips>ip1:ip2</ips></node>
785                </nodes>
786                <lans>
787                    <lan>
788                        <vname></vname><vnode></vnode><ip></ip>
789                        <bandwidth></bandwidth><member>node:port</member>
790                    </lan>
791                </lans>
792        """
793        class topo_parse:
794            """
795            Parse the topology XML and create the dats structure.
796            """
797            def __init__(self):
798                # Typing of the subelements for data conversion
799                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
800                self.int_subelements = ( 'bandwidth',)
801                self.float_subelements = ( 'delay',)
802                # The final data structure
803                self.nodes = [ ]
804                self.lans =  [ ]
805                self.topo = { \
806                        'node': self.nodes,\
807                        'lan' : self.lans,\
808                    }
809                self.element = { }  # Current element being created
810                self.chars = ""     # Last text seen
811
812            def end_element(self, name):
813                # After each sub element the contents is added to the current
814                # element or to the appropriate list.
815                if name == 'node':
816                    self.nodes.append(self.element)
817                    self.element = { }
818                elif name == 'lan':
819                    self.lans.append(self.element)
820                    self.element = { }
821                elif name in self.str_subelements:
822                    self.element[name] = self.chars
823                    self.chars = ""
824                elif name in self.int_subelements:
825                    self.element[name] = int(self.chars)
826                    self.chars = ""
827                elif name in self.float_subelements:
828                    self.element[name] = float(self.chars)
829                    self.chars = ""
830
831            def found_chars(self, data):
832                self.chars += data.rstrip()
833
834
835        tp = topo_parse();
836        parser = xml.parsers.expat.ParserCreate()
837        parser.EndElementHandler = tp.end_element
838        parser.CharacterDataHandler = tp.found_chars
839
840        parser.Parse(str)
841
842        return tp.topo
843       
844
845    def genviz(self, topo):
846        """
847        Generate the visualization the virtual topology
848        """
849
850        neato = "/usr/local/bin/neato"
851        # These are used to parse neato output and to create the visualization
852        # file.
853        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
854        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
855                "%s</type></node>"
856
857        try:
858            # Node names
859            nodes = [ n['vname'] for n in topo['node'] ]
860            topo_lans = topo['lan']
861        except KeyError:
862            raise service_error(service_error.internal, "Bad topology")
863
864        lans = { }
865        links = { }
866
867        # Walk through the virtual topology, organizing the connections into
868        # 2-node connections (links) and more-than-2-node connections (lans).
869        # When a lan is created, it's added to the list of nodes (there's a
870        # node in the visualization for the lan).
871        for l in topo_lans:
872            if links.has_key(l['vname']):
873                if len(links[l['vname']]) < 2:
874                    links[l['vname']].append(l['vnode'])
875                else:
876                    nodes.append(l['vname'])
877                    lans[l['vname']] = links[l['vname']]
878                    del links[l['vname']]
879                    lans[l['vname']].append(l['vnode'])
880            elif lans.has_key(l['vname']):
881                lans[l['vname']].append(l['vnode'])
882            else:
883                links[l['vname']] = [ l['vnode'] ]
884
885
886        # Open up a temporary file for dot to turn into a visualization
887        try:
888            df, dotname = tempfile.mkstemp()
889            dotfile = os.fdopen(df, 'w')
890        except IOError:
891            raise service_error(service_error.internal,
892                    "Failed to open file in genviz")
893
894        # Generate a dot/neato input file from the links, nodes and lans
895        try:
896            print >>dotfile, "graph G {"
897            for n in nodes:
898                print >>dotfile, '\t"%s"' % n
899            for l in links.keys():
900                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
901            for l in lans.keys():
902                for n in lans[l]:
903                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
904            print >>dotfile, "}"
905            dotfile.close()
906        except TypeError:
907            raise service_error(service_error.internal,
908                    "Single endpoint link in vtopo")
909        except IOError:
910            raise service_error(service_error.internal, "Cannot write dot file")
911
912        # Use dot to create a visualization
913        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
914                '-Gpack=true', dotname], stdout=PIPE)
915
916        # Translate dot to vis format
917        vis_nodes = [ ]
918        vis = { 'node': vis_nodes }
919        for line in dot.stdout:
920            m = vis_re.match(line)
921            if m:
922                vn = m.group(1)
923                vis_node = {'name': vn, \
924                        'x': float(m.group(2)),\
925                        'y' : float(m.group(3)),\
926                    }
927                if vn in links.keys() or vn in lans.keys():
928                    vis_node['type'] = 'lan'
929                else:
930                    vis_node['type'] = 'node'
931                vis_nodes.append(vis_node)
932        rv = dot.wait()
933
934        os.remove(dotname)
935        if rv == 0 : return vis
936        else: return None
937
938    def get_access(self, tb, nodes, user, tbparam, master, export_project,
939            access_user):
940        """
941        Get access to testbed through fedd and set the parameters for that tb
942        """
943        uri = self.tbmap.get(tb, None)
944        if not uri:
945            raise service_error(serice_error.server_config, 
946                    "Unknown testbed: %s" % tb)
947
948        # currently this lumps all users into one service access group
949        service_keys = [ a for u in user \
950                for a in u.get('access', []) \
951                    if a.has_key('sshPubkey')]
952
953        if len(service_keys) == 0:
954            raise service_error(service_error.req, 
955                    "Must have at least one SSH pubkey for services")
956
957
958        for p, u in access_user:
959            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
960                    "to %s") %  ((p or "None"), u, uri))
961
962            if p:
963                # Request with user and project specified
964                req = {\
965                        'destinationTestbed' : { 'uri' : uri },
966                        'project': { 
967                            'name': {'localname': p},
968                            'user': [ {'userID': { 'localname': u } } ],
969                            },
970                        'user':  user,
971                        'allocID' : { 'localname': 'test' },
972                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
973                        'serviceAccess' : service_keys
974                    }
975            else:
976                # Request with only user specified
977                req = {\
978                        'destinationTestbed' : { 'uri' : uri },
979                        'user':  [ {'userID': { 'localname': u } } ],
980                        'allocID' : { 'localname': 'test' },
981                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
982                        'serviceAccess' : service_keys
983                    }
984
985            if tb == master:
986                # NB, the export_project parameter is a dict that includes
987                # the type
988                req['exportProject'] = export_project
989
990            # node resources if any
991            if nodes != None and len(nodes) > 0:
992                rnodes = [ ]
993                for n in nodes:
994                    rn = { }
995                    image, hw, count = n.split(":")
996                    if image: rn['image'] = [ image ]
997                    if hw: rn['hardware'] = [ hw ]
998                    if count and int(count) >0 : rn['count'] = int(count)
999                    rnodes.append(rn)
1000                req['resources']= { }
1001                req['resources']['node'] = rnodes
1002
1003            try:
1004                if self.local_access.has_key(uri):
1005                    # Local access call
1006                    req = { 'RequestAccessRequestBody' : req }
1007                    r = self.local_access[uri].RequestAccess(req, 
1008                            fedid(file=self.cert_file))
1009                    r = { 'RequestAccessResponseBody' : r }
1010                else:
1011                    r = self.call_RequestAccess(uri, req, 
1012                            self.cert_file, self.cert_pwd, self.trusted_certs)
1013            except service_error, e:
1014                if e.code == service_error.access:
1015                    self.log.debug("[get_access] Access denied")
1016                    r = None
1017                    continue
1018                else:
1019                    raise e
1020
1021            if r.has_key('RequestAccessResponseBody'):
1022                # Through to here we have a valid response, not a fault.
1023                # Access denied is a fault, so something better or worse than
1024                # access denied has happened.
1025                r = r['RequestAccessResponseBody']
1026                self.log.debug("[get_access] Access granted")
1027                break
1028            else:
1029                raise service_error(service_error.protocol,
1030                        "Bad proxy response")
1031       
1032        if not r:
1033            raise service_error(service_error.access, 
1034                    "Access denied by %s (%s)" % (tb, uri))
1035
1036        e = r['emulab']
1037        p = e['project']
1038        tbparam[tb] = { 
1039                "boss": e['boss'],
1040                "host": e['ops'],
1041                "domain": e['domain'],
1042                "fs": e['fileServer'],
1043                "eventserver": e['eventServer'],
1044                "project": unpack_id(p['name']),
1045                "emulab" : e,
1046                "allocID" : r['allocID'],
1047                }
1048        # Make the testbed name be the label the user applied
1049        p['testbed'] = {'localname': tb }
1050
1051        for u in p['user']:
1052            role = u.get('role', None)
1053            if role == 'experimentCreation':
1054                tbparam[tb]['user'] = unpack_id(u['userID'])
1055                break
1056        else:
1057            raise service_error(service_error.internal, 
1058                    "No createExperimentUser from %s" %tb)
1059
1060        # Add attributes to barameter space.  We don't allow attributes to
1061        # overlay any parameters already installed.
1062        for a in e['fedAttr']:
1063            try:
1064                if a['attribute'] and isinstance(a['attribute'], basestring)\
1065                        and not tbparam[tb].has_key(a['attribute'].lower()):
1066                    tbparam[tb][a['attribute'].lower()] = a['value']
1067            except KeyError:
1068                self.log.error("Bad attribute in response: %s" % a)
1069       
1070    def release_access(self, tb, aid):
1071        """
1072        Release access to testbed through fedd
1073        """
1074
1075        uri = self.tbmap.get(tb, None)
1076        if not uri:
1077            raise service_error(serice_error.server_config, 
1078                    "Unknown testbed: %s" % tb)
1079
1080        if self.local_access.has_key(uri):
1081            resp = self.local_access[uri].ReleaseAccess(\
1082                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1083                    fedid(file=self.cert_file))
1084            resp = { 'ReleaseAccessResponseBody': resp } 
1085        else:
1086            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1087                    self.cert_file, self.cert_pwd, self.trusted_certs)
1088
1089        # better error coding
1090
1091    def remote_splitter(self, uri, desc, master):
1092
1093        req = {
1094                'description' : { 'ns2description': desc },
1095                'master': master,
1096                'include_fedkit': bool(self.fedkit),
1097                'include_gatewaykit': bool(self.gatewaykit)
1098            }
1099
1100        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1101                self.trusted_certs)
1102
1103        if r.has_key('Ns2SplitResponseBody'):
1104            r = r['Ns2SplitResponseBody']
1105            if r.has_key('output'):
1106                return r['output'].splitlines()
1107            else:
1108                raise service_error(service_error.protocol, 
1109                        "Bad splitter response (no output)")
1110        else:
1111            raise service_error(service_error.protocol, "Bad splitter response")
1112       
1113    class current_testbed:
1114        """
1115        Object for collecting the current testbed description.  The testbed
1116        description is saved to a file with the local testbed variables
1117        subsittuted line by line.
1118        """
1119        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1120            def tar_list_to_string(tl):
1121                if tl is None: return None
1122
1123                rv = ""
1124                for t in tl:
1125                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1126                            (t[0], os.path.basename(t[1]))
1127                return rv
1128
1129
1130            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1131            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1132            self.current_testbed = None
1133            self.testbed_file = None
1134
1135            self.def_expstart = \
1136                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1137            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1138            self.def_gwstart = \
1139                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1140            self.def_mgwstart = \
1141                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1142            self.def_gwimage = "FBSD61-TUNNEL2";
1143            self.def_gwtype = "pc";
1144            self.def_mgwcmd = '# '
1145            self.def_mgwcmdparams = ''
1146            self.def_gwcmd = '# '
1147            self.def_gwcmdparams = ''
1148
1149            self.eid = eid
1150            self.tmpdir = tmpdir
1151            # Convert fedkit and gateway kit (which are lists of tuples) into a
1152            # substituition string.
1153            self.fedkit = tar_list_to_string(fedkit)
1154            self.gatewaykit = tar_list_to_string(gatewaykit)
1155
1156        def __call__(self, line, master, allocated, tbparams):
1157            # Capture testbed topology descriptions
1158            if self.current_testbed == None:
1159                m = self.begin_testbed.match(line)
1160                if m != None:
1161                    self.current_testbed = m.group(1)
1162                    if self.current_testbed == None:
1163                        raise service_error(service_error.req,
1164                                "Bad request format (unnamed testbed)")
1165                    allocated[self.current_testbed] = \
1166                            allocated.get(self.current_testbed,0) + 1
1167                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1168                    if not os.path.exists(tb_dir):
1169                        try:
1170                            os.mkdir(tb_dir)
1171                        except IOError:
1172                            raise service_error(service_error.internal,
1173                                    "Cannot create %s" % tb_dir)
1174                    try:
1175                        self.testbed_file = open("%s/%s.%s.tcl" %
1176                                (tb_dir, self.eid, self.current_testbed), 'w')
1177                    except IOError:
1178                        self.testbed_file = None
1179                    return True
1180                else: return False
1181            else:
1182                m = self.end_testbed.match(line)
1183                if m != None:
1184                    if m.group(1) != self.current_testbed:
1185                        raise service_error(service_error.internal, 
1186                                "Mismatched testbed markers!?")
1187                    if self.testbed_file != None: 
1188                        self.testbed_file.close()
1189                        self.testbed_file = None
1190                    self.current_testbed = None
1191                elif self.testbed_file:
1192                    # Substitute variables and put the line into the local
1193                    # testbed file.
1194                    gwtype = tbparams[self.current_testbed].get(\
1195                            'connectortype', self.def_gwtype)
1196                    gwimage = tbparams[self.current_testbed].get(\
1197                            'connectorimage', self.def_gwimage)
1198                    mgwstart = tbparams[self.current_testbed].get(\
1199                            'masterconnectorstartcmd', self.def_mgwstart)
1200                    mexpstart = tbparams[self.current_testbed].get(\
1201                            'masternodestartcmd', self.def_mexpstart)
1202                    gwstart = tbparams[self.current_testbed].get(\
1203                            'slaveconnectorstartcmd', self.def_gwstart)
1204                    expstart = tbparams[self.current_testbed].get(\
1205                            'slavenodestartcmd', self.def_expstart)
1206                    project = tbparams[self.current_testbed].get('project')
1207                    gwcmd = tbparams[self.current_testbed].get(\
1208                            'slaveconnectorcmd', self.def_gwcmd)
1209                    gwcmdparams = tbparams[self.current_testbed].get(\
1210                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1211                    mgwcmd = tbparams[self.current_testbed].get(\
1212                            'masterconnectorcmd', self.def_gwcmd)
1213                    mgwcmdparams = tbparams[self.current_testbed].get(\
1214                            'masterconnectorcmdparams', self.def_gwcmdparams)
1215                    line = re.sub("GWTYPE", gwtype, line)
1216                    line = re.sub("GWIMAGE", gwimage, line)
1217                    if self.current_testbed == master:
1218                        line = re.sub("GWSTART", mgwstart, line)
1219                        line = re.sub("EXPSTART", mexpstart, line)
1220                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1221                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1222                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1223                    else:
1224                        line = re.sub("GWSTART", gwstart, line)
1225                        line = re.sub("EXPSTART", expstart, line)
1226                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1227                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1228                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1229                    #These expansions contain EID and PROJDIR.  NB these are
1230                    # local fedkit and gatewaykit, which are strings.
1231                    if self.fedkit:
1232                        line = re.sub("FEDKIT", self.fedkit, line)
1233                    if self.gatewaykit:
1234                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1235                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1236                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1237                    line = re.sub("EID", self.eid, line)
1238                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1239                            (project, self.eid), line)
1240                    print >>self.testbed_file, line
1241                return True
1242
1243    class allbeds:
1244        """
1245        Process the Allbeds section.  Get access to each federant and save the
1246        parameters in tbparams
1247        """
1248        def __init__(self, get_access):
1249            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1250            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1251            self.in_allbeds = False
1252            self.get_access = get_access
1253
1254        def __call__(self, line, user, tbparams, master, export_project,
1255                access_user):
1256            # Testbed access parameters
1257            if not self.in_allbeds:
1258                if self.begin_allbeds.match(line):
1259                    self.in_allbeds = True
1260                    return True
1261                else:
1262                    return False
1263            else:
1264                if self.end_allbeds.match(line):
1265                    self.in_allbeds = False
1266                else:
1267                    nodes = line.split('|')
1268                    tb = nodes.pop(0)
1269                    self.get_access(tb, nodes, user, tbparams, master,
1270                            export_project, access_user)
1271                return True
1272
1273    class gateways:
1274        def __init__(self, eid, master, tmpdir, gw_pubkey,
1275                gw_secretkey, copy_file, fedkit):
1276            self.begin_gateways = \
1277                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1278            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1279            self.current_gateways = None
1280            self.control_gateway = None
1281            self.active_end = { }
1282
1283            self.eid = eid
1284            self.master = master
1285            self.tmpdir = tmpdir
1286            self.gw_pubkey_base = gw_pubkey
1287            self.gw_secretkey_base = gw_secretkey
1288
1289            self.copy_file = copy_file
1290            self.fedkit = fedkit
1291
1292
1293        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1294                active_end, tbparams, dtb, myname, desthost, type):
1295            """
1296            Produce a gateway configuration file from a gateways line.
1297            """
1298
1299            sproject = tbparams[gw].get('project', 'project')
1300            dproject = tbparams[dtb].get('project', 'project')
1301            sdomain = ".%s.%s%s" % (eid, sproject,
1302                    tbparams[gw].get('domain', ".example.com"))
1303            ddomain = ".%s.%s%s" % (eid, dproject,
1304                    tbparams[dtb].get('domain', ".example.com"))
1305            boss = tbparams[master].get('boss', "boss")
1306            fs = tbparams[master].get('fs', "fs")
1307            event_server = "%s%s" % \
1308                    (tbparams[gw].get('eventserver', "event_server"),
1309                            tbparams[gw].get('domain', "example.com"))
1310            remote_event_server = "%s%s" % \
1311                    (tbparams[dtb].get('eventserver', "event_server"),
1312                            tbparams[dtb].get('domain', "example.com"))
1313            seer_control = "%s%s" % \
1314                    (tbparams[gw].get('control', "control"), sdomain)
1315            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1316
1317            if self.fedkit:
1318                remote_script_dir = "/usr/local/federation/bin"
1319                local_script_dir = "/usr/local/federation/bin"
1320            else:
1321                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1322                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1323
1324            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1325            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1326            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1327
1328            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1329            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1330
1331            # translate to lower case so the `hostname` hack for specifying
1332            # configuration files works.
1333            conf_file = conf_file.lower();
1334            remote_conf_file = remote_conf_file.lower();
1335
1336            if dtb == master:
1337                active = "false"
1338            elif gw == master:
1339                active = "true"
1340            elif active_end.has_key('%s-%s' % (dtb, gw)):
1341                active = "false"
1342            else:
1343                active_end['%s-%s' % (gw, dtb)] = 1
1344                active = "true"
1345
1346            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1347            print >>gwconfig, "Active: %s" % active
1348            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1349            if tunnel_iface:
1350                print >>gwconfig, "Interface: %s" % tunnel_iface
1351            print >>gwconfig, "BossName: %s" % boss
1352            print >>gwconfig, "FsName: %s" % fs
1353            print >>gwconfig, "EventServerName: %s" % event_server
1354            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1355            print >>gwconfig, "SeerControl: %s" % seer_control
1356            print >>gwconfig, "Type: %s" % type
1357            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1358            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1359                    local_script_dir
1360            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1361            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1362            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1363                    (remote_conf_dir, remote_conf_file)
1364            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1365            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1366            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1367            gwconfig.close()
1368
1369            return active == "true"
1370
1371        def __call__(self, line, allocated, tbparams):
1372            # Process gateways
1373            if not self.current_gateways:
1374                m = self.begin_gateways.match(line)
1375                if m:
1376                    self.current_gateways = m.group(1)
1377                    if allocated.has_key(self.current_gateways):
1378                        # This test should always succeed
1379                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1380                        if not os.path.exists(tb_dir):
1381                            try:
1382                                os.mkdir(tb_dir)
1383                            except IOError:
1384                                raise service_error(service_error.internal,
1385                                        "Cannot create %s" % tb_dir)
1386                    else:
1387                        # XXX
1388                        self.log.error("[gateways]: Ignoring gateways for " + \
1389                                "unknown testbed %s" % self.current_gateways)
1390                        self.current_gateways = None
1391                    return True
1392                else:
1393                    return False
1394            else:
1395                m = self.end_gateways.match(line)
1396                if m :
1397                    if m.group(1) != self.current_gateways:
1398                        raise service_error(service_error.internal,
1399                                "Mismatched gateway markers!?")
1400                    if self.control_gateway:
1401                        try:
1402                            cc = open("%s/%s/client.conf" %
1403                                    (self.tmpdir, self.current_gateways), 'w')
1404                            print >>cc, "ControlGateway: %s" % \
1405                                    self.control_gateway
1406                            if tbparams[self.master].has_key('smbshare'):
1407                                print >>cc, "SMBSHare: %s" % \
1408                                        tbparams[self.master]['smbshare']
1409                            print >>cc, "ProjectUser: %s" % \
1410                                    tbparams[self.master]['user']
1411                            print >>cc, "ProjectName: %s" % \
1412                                    tbparams[self.master]['project']
1413                            print >>cc, "ExperimentID: %s/%s" % \
1414                                    ( tbparams[self.master]['project'], \
1415                                    self.eid )
1416                            cc.close()
1417                        except IOError:
1418                            raise service_error(service_error.internal,
1419                                    "Error creating client config")
1420                        # XXX: This seer specific file should disappear
1421                        try:
1422                            cc = open("%s/%s/seer.conf" %
1423                                    (self.tmpdir, self.current_gateways),
1424                                    'w')
1425                            if self.current_gateways != self.master:
1426                                print >>cc, "ControlNode: %s" % \
1427                                        self.control_gateway
1428                            print >>cc, "ExperimentID: %s/%s" % \
1429                                    ( tbparams[self.master]['project'], \
1430                                    self.eid )
1431                            cc.close()
1432                        except IOError:
1433                            raise service_error(service_error.internal,
1434                                    "Error creating seer config")
1435                    else:
1436                        debug.error("[gateways]: No control gateway for %s" %\
1437                                    self.current_gateways)
1438                    self.current_gateways = None
1439                else:
1440                    dtb, myname, desthost, type = line.split(" ")
1441
1442                    if type == "control" or type == "both":
1443                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1444                                self.eid, 
1445                                tbparams[self.current_gateways]['project'],
1446                                tbparams[self.current_gateways]['domain'])
1447                    try:
1448                        active = self.gateway_conf_file(self.current_gateways,
1449                                self.master, self.eid, self.gw_pubkey_base,
1450                                self.gw_secretkey_base,
1451                                self.active_end, tbparams, dtb, myname,
1452                                desthost, type)
1453                    except IOError, e:
1454                        raise service_error(service_error.internal,
1455                                "Failed to write config file for %s" % \
1456                                        self.current_gateway)
1457           
1458                    gw_pubkey = "%s/keys/%s" % \
1459                            (self.tmpdir, self.gw_pubkey_base)
1460                    gw_secretkey = "%s/keys/%s" % \
1461                            (self.tmpdir, self.gw_secretkey_base)
1462
1463                    pkfile = "%s/%s/%s" % \
1464                            ( self.tmpdir, self.current_gateways, 
1465                                    self.gw_pubkey_base)
1466                    skfile = "%s/%s/%s" % \
1467                            ( self.tmpdir, self.current_gateways, 
1468                                    self.gw_secretkey_base)
1469
1470                    if not os.path.exists(pkfile):
1471                        try:
1472                            self.copy_file(gw_pubkey, pkfile)
1473                        except IOError:
1474                            service_error(service_error.internal,
1475                                    "Failed to copy pubkey file")
1476
1477                    if active and not os.path.exists(skfile):
1478                        try:
1479                            self.copy_file(gw_secretkey, skfile)
1480                        except IOError:
1481                            service_error(service_error.internal,
1482                                    "Failed to copy secretkey file")
1483                return True
1484
1485    class shunt_to_file:
1486        """
1487        Simple class to write data between two regexps to a file.
1488        """
1489        def __init__(self, begin, end, filename):
1490            """
1491            Begin shunting on a match of begin, stop on end, send data to
1492            filename.
1493            """
1494            self.begin = re.compile(begin)
1495            self.end = re.compile(end)
1496            self.in_shunt = False
1497            self.file = None
1498            self.filename = filename
1499
1500        def __call__(self, line):
1501            """
1502            Call this on each line in the input that may be shunted.
1503            """
1504            if not self.in_shunt:
1505                if self.begin.match(line):
1506                    self.in_shunt = True
1507                    try:
1508                        self.file = open(self.filename, "w")
1509                    except:
1510                        self.file = None
1511                        raise
1512                    return True
1513                else:
1514                    return False
1515            else:
1516                if self.end.match(line):
1517                    if self.file: 
1518                        self.file.close()
1519                        self.file = None
1520                    self.in_shunt = False
1521                else:
1522                    if self.file:
1523                        print >>self.file, line
1524                return True
1525
1526    class shunt_to_list:
1527        """
1528        Same interface as shunt_to_file.  Data collected in self.list, one list
1529        element per line.
1530        """
1531        def __init__(self, begin, end):
1532            self.begin = re.compile(begin)
1533            self.end = re.compile(end)
1534            self.in_shunt = False
1535            self.list = [ ]
1536       
1537        def __call__(self, line):
1538            if not self.in_shunt:
1539                if self.begin.match(line):
1540                    self.in_shunt = True
1541                    return True
1542                else:
1543                    return False
1544            else:
1545                if self.end.match(line):
1546                    self.in_shunt = False
1547                else:
1548                    self.list.append(line)
1549                return True
1550
1551    class shunt_to_string:
1552        """
1553        Same interface as shunt_to_file.  Data collected in self.str, all in
1554        one string.
1555        """
1556        def __init__(self, begin, end):
1557            self.begin = re.compile(begin)
1558            self.end = re.compile(end)
1559            self.in_shunt = False
1560            self.str = ""
1561       
1562        def __call__(self, line):
1563            if not self.in_shunt:
1564                if self.begin.match(line):
1565                    self.in_shunt = True
1566                    return True
1567                else:
1568                    return False
1569            else:
1570                if self.end.match(line):
1571                    self.in_shunt = False
1572                else:
1573                    self.str += line
1574                return True
1575
1576    def create_experiment(self, req, fid):
1577        """
1578        The external interface to experiment creation called from the
1579        dispatcher.
1580
1581        Creates a working directory, splits the incoming description using the
1582        splitter script and parses out the avrious subsections using the
1583        lcasses above.  Once each sub-experiment is created, use pooled threads
1584        to instantiate them and start it all up.
1585        """
1586
1587        if not self.auth.check_attribute(fid, 'create'):
1588            raise service_error(service_error.access, "Create access denied")
1589
1590        try:
1591            tmpdir = tempfile.mkdtemp(prefix="split-")
1592        except IOError:
1593            raise service_error(service_error.internal, "Cannot create tmp dir")
1594
1595        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1596        gw_secretkey_base = "fed.%s" % self.ssh_type
1597        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1598        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1599        tclfile = tmpdir + "/experiment.tcl"
1600        tbparams = { }
1601        try:
1602            access_user = self.accessdb[fid]
1603        except KeyError:
1604            raise service_error(service_error.internal,
1605                    "Access map and authorizer out of sync in " + \
1606                            "create_experiment for fedid %s"  % fid)
1607
1608        pid = "dummy"
1609        gid = "dummy"
1610        # XXX
1611        fail_soft = False
1612
1613        try:
1614            os.mkdir(tmpdir+"/keys")
1615        except OSError:
1616            raise service_error(service_error.internal,
1617                    "Can't make temporary dir")
1618
1619        req = req.get('CreateRequestBody', None)
1620        if not req:
1621            raise service_error(service_error.req,
1622                    "Bad request format (no CreateRequestBody)")
1623        # The tcl parser needs to read a file so put the content into that file
1624        descr=req.get('experimentdescription', None)
1625        if descr:
1626            file_content=descr.get('ns2description', None)
1627            if file_content:
1628                try:
1629                    f = open(tclfile, 'w')
1630                    f.write(file_content)
1631                    f.close()
1632                except IOError:
1633                    raise service_error(service_error.internal,
1634                            "Cannot write temp experiment description")
1635            else:
1636                raise service_error(service_error.req, 
1637                        "Only ns2descriptions supported")
1638        else:
1639            raise service_error(service_error.req, "No experiment description")
1640
1641        if req.has_key('experimentID') and \
1642                req['experimentID'].has_key('localname'):
1643            eid = req['experimentID']['localname']
1644            self.state_lock.acquire()
1645            while (self.state.has_key(eid)):
1646                eid += random.choice(string.ascii_letters)
1647            # To avoid another thread picking this localname
1648            self.state[eid] = "placeholder"
1649            self.state_lock.release()
1650        else:
1651            eid = self.exp_stem
1652            for i in range(0,5):
1653                eid += random.choice(string.ascii_letters)
1654            self.state_lock.acquire()
1655            while (self.state.has_key(eid)):
1656                eid = self.exp_stem
1657                for i in range(0,5):
1658                    eid += random.choice(string.ascii_letters)
1659            # To avoid another thread picking this localname
1660            self.state[eid] = "placeholder"
1661            self.state_lock.release()
1662
1663        try: 
1664            # This catches exceptions to clear the placeholder if necessary
1665            try:
1666                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1667            except ValueError:
1668                raise service_error(service_error.server_config, 
1669                        "Bad key type (%s)" % self.ssh_type)
1670
1671            user = req.get('user', None)
1672            if user == None:
1673                raise service_error(service_error.req, "No user")
1674
1675            master = req.get('master', None)
1676            if not master:
1677                raise service_error(service_error.req,
1678                        "No master testbed label")
1679            export_project = req.get('exportProject', None)
1680            if not export_project:
1681                raise service_error(service_error.req, "No export project")
1682           
1683            if self.splitter_url:
1684                self.log.debug("Calling remote splitter at %s" % \
1685                        self.splitter_url)
1686                split_data = self.remote_splitter(self.splitter_url,
1687                        file_content, master)
1688            else:
1689                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1690                    str(self.muxmax), '-m', master]
1691
1692                if self.fedkit:
1693                    tclcmd.append('-k')
1694
1695                if self.gatewaykit:
1696                    tclcmd.append('-K')
1697
1698                tclcmd.extend([pid, gid, eid, tclfile])
1699
1700                self.log.debug("running local splitter %s", " ".join(tclcmd))
1701                tclparser = Popen(tclcmd, stdout=PIPE)
1702                split_data = tclparser.stdout
1703
1704            allocated = { }         # Testbeds we can access
1705            started = { }           # Testbeds where a sub-experiment started
1706                                # successfully
1707
1708            # Objects to parse the splitter output (defined above)
1709            parse_current_testbed = self.current_testbed(eid, tmpdir,
1710                    self.fedkit, self.gatewaykit)
1711            parse_allbeds = self.allbeds(self.get_access)
1712            parse_gateways = self.gateways(eid, master, tmpdir,
1713                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1714                    self.fedkit)
1715            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1716                        "^#\s+End\s+Vtopo")
1717            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1718                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1719            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1720                    "^#\s+End\s+tarfiles")
1721            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1722                    "^#\s+End\s+rpms")
1723
1724            # Working on the split data
1725            for line in split_data:
1726                line = line.rstrip()
1727                if parse_current_testbed(line, master, allocated, tbparams):
1728                    continue
1729                elif parse_allbeds(line, user, tbparams, master, export_project,
1730                        access_user):
1731                    continue
1732                elif parse_gateways(line, allocated, tbparams):
1733                    continue
1734                elif parse_vtopo(line):
1735                    continue
1736                elif parse_hostnames(line):
1737                    continue
1738                elif parse_tarfiles(line):
1739                    continue
1740                elif parse_rpms(line):
1741                    continue
1742                else:
1743                    raise service_error(service_error.internal, 
1744                            "Bad tcl parse? %s" % line)
1745            # Virtual topology and visualization
1746            vtopo = self.gentopo(parse_vtopo.str)
1747            if not vtopo:
1748                raise service_error(service_error.internal, 
1749                        "Failed to generate virtual topology")
1750
1751            vis = self.genviz(vtopo)
1752            if not vis:
1753                raise service_error(service_error.internal, 
1754                        "Failed to generate visualization")
1755           
1756            # save federant information
1757            for k in allocated.keys():
1758                tbparams[k]['federant'] = {\
1759                        'name': [ { 'localname' : eid} ],\
1760                        'emulab': tbparams[k]['emulab'],\
1761                        'allocID' : tbparams[k]['allocID'],\
1762                        'master' : k == master,\
1763                    }
1764
1765
1766            # Copy tarfiles and rpms needed at remote sites into a staging area
1767            try:
1768                if self.fedkit:
1769                    for t in self.fedkit:
1770                        parse_tarfiles.list.append(t[1])
1771                if self.gatewaykit:
1772                    for t in self.gatewaykit:
1773                        parse_tarfiles.list.append(t[1])
1774                for t in parse_tarfiles.list:
1775                    if not os.path.exists("%s/tarfiles" % tmpdir):
1776                        os.mkdir("%s/tarfiles" % tmpdir)
1777                    self.copy_file(t, "%s/tarfiles/%s" % \
1778                            (tmpdir, os.path.basename(t)))
1779                for r in parse_rpms.list:
1780                    if not os.path.exists("%s/rpms" % tmpdir):
1781                        os.mkdir("%s/rpms" % tmpdir)
1782                    self.copy_file(r, "%s/rpms/%s" % \
1783                            (tmpdir, os.path.basename(r)))
1784                # A null experiment file in case we need to create a remote
1785                # experiment from scratch
1786                f = open("%s/null.tcl" % tmpdir, "w")
1787                print >>f, """
1788set ns [new Simulator]
1789source tb_compat.tcl
1790
1791set a [$ns node]
1792
1793$ns rtproto Session
1794$ns run
1795"""
1796                f.close()
1797
1798            except IOError, e:
1799                raise service_error(service_error.internal, 
1800                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1801
1802        except service_error, e:
1803            # If something goes wrong in the parse (usually an access error)
1804            # clear the placeholder state.  From here on out the code delays
1805            # exceptions.
1806            self.state_lock.acquire()
1807            del self.state[eid]
1808            self.state_lock.release()
1809            raise e
1810
1811        thread_pool = self.thread_pool(self.nthreads)
1812        threads = [ ]
1813
1814        for tb in [ k for k in allocated.keys() if k != master]:
1815            # Create and start a thread to start the segment, and save it to
1816            # get the return value later
1817            thread_pool.wait_for_slot()
1818            t  = self.pooled_thread(target=self.start_segment, 
1819                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1820                    pdata=thread_pool, trace_file=self.trace_file)
1821            threads.append(t)
1822            t.start()
1823
1824        # Wait until all finish
1825        thread_pool.wait_for_all_done()
1826
1827        # If none failed, start the master
1828        failed = [ t.getName() for t in threads if not t.rv ]
1829
1830        if len(failed) == 0:
1831            if not self.start_segment(master, eid, tbparams, tmpdir):
1832                failed.append(master)
1833
1834        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1835        # If one failed clean up, unless fail_soft is set
1836        if failed:
1837            if not fail_soft:
1838                thread_pool.clear()
1839                for tb in succeeded:
1840                    # Create and start a thread to stop the segment
1841                    thread_pool.wait_for_slot()
1842                    t  = self.pooled_thread(target=self.stop_segment, 
1843                            args=(tb, eid, tbparams), name=tb,
1844                            pdata=thread_pool, trace_file=self.trace_file)
1845                    t.start()
1846                # Wait until all finish
1847                thread_pool.wait_for_all_done()
1848
1849                # release the allocations
1850                for tb in tbparams.keys():
1851                    self.release_access(tb, tbparams[tb]['allocID'])
1852                # Remove the placeholder
1853                self.state_lock.acquire()
1854                del self.state[eid]
1855                self.state_lock.release()
1856
1857                raise service_error(service_error.federant,
1858                    "Swap in failed on %s" % ",".join(failed))
1859        else:
1860            self.log.info("[start_segment]: Experiment %s started" % eid)
1861
1862        # Generate an ID for the experiment (slice) and a certificate that the
1863        # allocator can use to prove they own it.  We'll ship it back through
1864        # the encrypted connection.
1865        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1866
1867        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1868
1869        # Walk up tmpdir, deleting as we go
1870        for path, dirs, files in os.walk(tmpdir, topdown=False):
1871            for f in files:
1872                os.remove(os.path.join(path, f))
1873            for d in dirs:
1874                os.rmdir(os.path.join(path, d))
1875        os.rmdir(tmpdir)
1876
1877        # The deepcopy prevents the allocation ID and other binaries from being
1878        # translated into other formats
1879        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1880                for tb in tbparams.keys() \
1881                    if tbparams[tb].has_key('federant') ],\
1882                    'vtopo': vtopo,\
1883                    'vis' : vis,
1884                    'experimentID' : [\
1885                            { 'fedid': copy.copy(expid) }, \
1886                            { 'localname': eid },\
1887                        ],\
1888                    'experimentAccess': { 'X509' : expcert },\
1889                }
1890        # remove the allocationID info from each federant
1891        for f in resp['federant']:
1892            if f.has_key('allocID'): del f['allocID']
1893
1894        # Insert the experiment into our state and update the disk copy
1895        self.state_lock.acquire()
1896        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1897                for tb in tbparams.keys() \
1898                    if tbparams[tb].has_key('federant') ],\
1899                    'vtopo': vtopo,\
1900                    'vis' : vis,
1901                    'owner': fid,
1902                    'experimentID' : [\
1903                            { 'fedid': expid }, { 'localname': eid },\
1904                        ],\
1905                }
1906        self.state[eid] = self.state[expid]
1907        if self.state_filename: self.write_state()
1908        self.state_lock.release()
1909
1910        self.auth.set_attribute(fid, expid)
1911        self.auth.set_attribute(expid, expid)
1912
1913        if not failed:
1914            return resp
1915        else:
1916            raise service_error(service_error.partial, \
1917                    "Partial swap in on %s" % ",".join(succeeded))
1918
1919    def check_experiment_access(self, fid, key):
1920        """
1921        Confirm that the fid has access to the experiment.  Though a request
1922        may be made in terms of a local name, the access attribute is always
1923        the experiment's fedid.
1924        """
1925        if not isinstance(key, fedid):
1926            self.state_lock.acquire()
1927            if self.state.has_key(key):
1928                if isinstance(self.state[key], dict):
1929                    try:
1930                        kl = [ f['fedid'] for f in \
1931                                self.state[key]['experimentID']\
1932                                    if f.has_key('fedid') ]
1933                    except KeyError:
1934                        self.state_lock.release()
1935                        raise service_error(service_error.internal, 
1936                                "No fedid for experiment %s when checking " +\
1937                                        "access(!?)" % key)
1938                    if len(kl) == 1:
1939                        key = kl[0]
1940                    else:
1941                        self.state_lock.release()
1942                        raise service_error(service_error.internal, 
1943                                "multiple fedids for experiment %s when " +\
1944                                        "checking access(!?)" % key)
1945                elif isinstance(self.state[key], str):
1946                    self.state_lock.release()
1947                    raise service_error(service_error.internal, 
1948                            ("experiment %s is placeholder.  " +\
1949                                    "Creation in progress or aborted oddly") \
1950                                    % key)
1951                else:
1952                    self.state_lock.release()
1953                    raise service_error(service_error.internal, 
1954                            "Unexpected state for %s" % key)
1955
1956            else:
1957                self.state_lock.release()
1958                raise service_error(service_error.access, "Access Denied")
1959            self.state_lock.release()
1960
1961        if self.auth.check_attribute(fid, key):
1962            return True
1963        else:
1964            raise service_error(service_error.access, "Access Denied")
1965
1966
1967
1968    def get_vtopo(self, req, fid):
1969        """
1970        Return the stored virtual topology for this experiment
1971        """
1972        rv = None
1973
1974        req = req.get('VtopoRequestBody', None)
1975        if not req:
1976            raise service_error(service_error.req,
1977                    "Bad request format (no VtopoRequestBody)")
1978        exp = req.get('experiment', None)
1979        if exp:
1980            if exp.has_key('fedid'):
1981                key = exp['fedid']
1982                keytype = "fedid"
1983            elif exp.has_key('localname'):
1984                key = exp['localname']
1985                keytype = "localname"
1986            else:
1987                raise service_error(service_error.req, "Unknown lookup type")
1988        else:
1989            raise service_error(service_error.req, "No request?")
1990
1991        self.check_experiment_access(fid, key)
1992
1993        self.state_lock.acquire()
1994        if self.state.has_key(key):
1995            rv = { 'experiment' : {keytype: key },\
1996                    'vtopo': self.state[key]['vtopo'],\
1997                }
1998        self.state_lock.release()
1999
2000        if rv: return rv
2001        else: raise service_error(service_error.req, "No such experiment")
2002
2003    def get_vis(self, req, fid):
2004        """
2005        Return the stored visualization for this experiment
2006        """
2007        rv = None
2008
2009        req = req.get('VisRequestBody', None)
2010        if not req:
2011            raise service_error(service_error.req,
2012                    "Bad request format (no VisRequestBody)")
2013        exp = req.get('experiment', None)
2014        if exp:
2015            if exp.has_key('fedid'):
2016                key = exp['fedid']
2017                keytype = "fedid"
2018            elif exp.has_key('localname'):
2019                key = exp['localname']
2020                keytype = "localname"
2021            else:
2022                raise service_error(service_error.req, "Unknown lookup type")
2023        else:
2024            raise service_error(service_error.req, "No request?")
2025
2026        self.check_experiment_access(fid, key)
2027
2028        self.state_lock.acquire()
2029        if self.state.has_key(key):
2030            rv =  { 'experiment' : {keytype: key },\
2031                    'vis': self.state[key]['vis'],\
2032                    }
2033        self.state_lock.release()
2034
2035        if rv: return rv
2036        else: raise service_error(service_error.req, "No such experiment")
2037
2038    def get_info(self, req, fid):
2039        """
2040        Return all the stored info about this experiment
2041        """
2042        rv = None
2043
2044        req = req.get('InfoRequestBody', None)
2045        if not req:
2046            raise service_error(service_error.req,
2047                    "Bad request format (no VisRequestBody)")
2048        exp = req.get('experiment', None)
2049        if exp:
2050            if exp.has_key('fedid'):
2051                key = exp['fedid']
2052                keytype = "fedid"
2053            elif exp.has_key('localname'):
2054                key = exp['localname']
2055                keytype = "localname"
2056            else:
2057                raise service_error(service_error.req, "Unknown lookup type")
2058        else:
2059            raise service_error(service_error.req, "No request?")
2060
2061        self.check_experiment_access(fid, key)
2062
2063        # The state may be massaged by the service function that called
2064        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2065        # state.
2066        self.state_lock.acquire()
2067        if self.state.has_key(key):
2068            rv = copy.deepcopy(self.state[key])
2069        self.state_lock.release()
2070        # Remove the owner info
2071        del rv['owner']
2072        # remove the allocationID info from each federant
2073        for f in rv['federant']:
2074            if f.has_key('allocID'): del f['allocID']
2075
2076        if rv: return rv
2077        else: raise service_error(service_error.req, "No such experiment")
2078
2079
2080    def terminate_experiment(self, req, fid):
2081        """
2082        Swap this experiment out on the federants and delete the shared
2083        information
2084        """
2085        tbparams = { }
2086        req = req.get('TerminateRequestBody', None)
2087        if not req:
2088            raise service_error(service_error.req,
2089                    "Bad request format (no TerminateRequestBody)")
2090        exp = req.get('experiment', None)
2091        if exp:
2092            if exp.has_key('fedid'):
2093                key = exp['fedid']
2094                keytype = "fedid"
2095            elif exp.has_key('localname'):
2096                key = exp['localname']
2097                keytype = "localname"
2098            else:
2099                raise service_error(service_error.req, "Unknown lookup type")
2100        else:
2101            raise service_error(service_error.req, "No request?")
2102
2103        self.check_experiment_access(fid, key)
2104
2105        self.state_lock.acquire()
2106        fed_exp = self.state.get(key, None)
2107
2108        if fed_exp:
2109            # This branch of the conditional holds the lock to generate a
2110            # consistent temporary tbparams variable to deallocate experiments.
2111            # It releases the lock to do the deallocations and reacquires it to
2112            # remove the experiment state when the termination is complete.
2113            ids = []
2114            #  experimentID is a list of dicts that are self-describing
2115            #  identifiers.  This finds all the fedids and localnames - the
2116            #  keys of self.state - and puts them into ids.
2117            for id in fed_exp.get('experimentID', []):
2118                if id.has_key('fedid'): ids.append(id['fedid'])
2119                if id.has_key('localname'): ids.append(id['localname'])
2120
2121            # Construct enough of the tbparams to make the stop_segment calls
2122            # work
2123            for fed in fed_exp['federant']:
2124                try:
2125                    for e in fed['name']:
2126                        eid = e.get('localname', None)
2127                        if eid: break
2128                    else:
2129                        continue
2130
2131                    p = fed['emulab']['project']
2132
2133                    project = p['name']['localname']
2134                    tb = p['testbed']['localname']
2135                    user = p['user'][0]['userID']['localname']
2136
2137                    domain = fed['emulab']['domain']
2138                    host  = fed['emulab']['ops']
2139                    aid = fed['allocID']
2140                except KeyError, e:
2141                    continue
2142                tbparams[tb] = {\
2143                        'user': user,\
2144                        'domain': domain,\
2145                        'project': project,\
2146                        'host': host,\
2147                        'eid': eid,\
2148                        'aid': aid,\
2149                    }
2150            self.state_lock.release()
2151
2152            # Stop everyone.
2153            thread_pool = self.thread_pool(self.nthreads)
2154            for tb in tbparams.keys():
2155                # Create and start a thread to stop the segment
2156                thread_pool.wait_for_slot()
2157                t  = self.pooled_thread(target=self.stop_segment, 
2158                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2159                        pdata=thread_pool, trace_file=self.trace_file)
2160                t.start()
2161            # Wait for completions
2162            thread_pool.wait_for_all_done()
2163
2164            # release the allocations
2165            for tb in tbparams.keys():
2166                self.release_access(tb, tbparams[tb]['aid'])
2167
2168            # Remove the terminated experiment
2169            self.state_lock.acquire()
2170            for id in ids:
2171                if self.state.has_key(id): del self.state[id]
2172
2173            if self.state_filename: self.write_state()
2174            self.state_lock.release()
2175
2176            return { 'experiment': exp }
2177        else:
2178            # Don't forget to release the lock
2179            self.state_lock.release()
2180            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.