source: fedd/federation/experiment_control.py @ c2dbca8

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since c2dbca8 was c2dbca8, checked in by Ted Faber <faber@…>, 15 years ago

Now all the filesystem tree stuff is done in a script.

  • Property mode set to 100644
File size: 85.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44   
45    class thread_pool:
46        """
47        A class to keep track of a set of threads all invoked for the same
48        task.  Manages the mutual exclusion of the states.
49        """
50        def __init__(self, nthreads):
51            """
52            Start a pool.
53            """
54            self.changed = Condition()
55            self.started = 0
56            self.terminated = 0
57            self.nthreads = nthreads
58
59        def acquire(self):
60            """
61            Get the pool's lock.
62            """
63            self.changed.acquire()
64
65        def release(self):
66            """
67            Release the pool's lock.
68            """
69            self.changed.release()
70
71        def wait(self, timeout = None):
72            """
73            Wait for a pool thread to start or stop.
74            """
75            self.changed.wait(timeout)
76
77        def start(self):
78            """
79            Called by a pool thread to report starting.
80            """
81            self.changed.acquire()
82            self.started += 1
83            self.changed.notifyAll()
84            self.changed.release()
85
86        def terminate(self):
87            """
88            Called by a pool thread to report finishing.
89            """
90            self.changed.acquire()
91            self.terminated += 1
92            self.changed.notifyAll()
93            self.changed.release()
94
95        def clear(self):
96            """
97            Clear all pool data.
98            """
99            self.changed.acquire()
100            self.started = 0
101            self.terminated =0
102            self.changed.notifyAll()
103            self.changed.release()
104
105        def wait_for_slot(self):
106            """
107            Wait until we have a free slot to start another pooled thread
108            """
109            self.acquire()
110            while self.started - self.terminated >= self.nthreads:
111                self.wait()
112            self.release()
113
114        def wait_for_all_done(self):
115            """
116            Wait until all active threads finish (and at least one has started)
117            """
118            self.acquire()
119            while self.started == 0 or self.started > self.terminated:
120                self.wait()
121            self.release()
122
123    class pooled_thread(Thread):
124        """
125        One of a set of threads dedicated to a specific task.  Uses the
126        thread_pool class above for coordination.
127        """
128        def __init__(self, group=None, target=None, name=None, args=(), 
129                kwargs={}, pdata=None, trace_file=None):
130            Thread.__init__(self, group, target, name, args, kwargs)
131            self.rv = None          # Return value of the ops in this thread
132            self.exception = None   # Exception that terminated this thread
133            self.target=target      # Target function to run on start()
134            self.args = args        # Args to pass to target
135            self.kwargs = kwargs    # Additional kw args
136            self.pdata = pdata      # thread_pool for this class
137            # Logger for this thread
138            self.log = logging.getLogger("fedd.experiment_control")
139       
140        def run(self):
141            """
142            Emulate Thread.run, except add pool data manipulation and error
143            logging.
144            """
145            if self.pdata:
146                self.pdata.start()
147
148            if self.target:
149                try:
150                    self.rv = self.target(*self.args, **self.kwargs)
151                except service_error, s:
152                    self.exception = s
153                    self.log.error("Thread exception: %s %s" % \
154                            (s.code_string(), s.desc))
155                except:
156                    self.exception = sys.exc_info()[1]
157                    self.log.error(("Unexpected thread exception: %s" +\
158                            "Trace %s") % (self.exception,\
159                                traceback.format_exc()))
160            if self.pdata:
161                self.pdata.terminate()
162
163    call_RequestAccess = service_caller('RequestAccess')
164    call_ReleaseAccess = service_caller('ReleaseAccess')
165    call_Ns2Split = service_caller('Ns2Split')
166
167    def __init__(self, config=None, auth=None):
168        """
169        Intialize the various attributes, most from the config object
170        """
171
172        def parse_tarfile_list(tf):
173            """
174            Parse a tarfile list from the configuration.  This is a set of
175            paths and tarfiles separated by spaces.
176            """
177            rv = [ ]
178            if tf is not None:
179                tl = tf.split()
180                while len(tl) > 1:
181                    p, t = tl[0:2]
182                    del tl[0:2]
183                    rv.append((p, t))
184            return rv
185
186        self.thread_with_rv = experiment_control_local.pooled_thread
187        self.thread_pool = experiment_control_local.thread_pool
188
189        self.cert_file = config.get("experiment_control", "cert_file")
190        if self.cert_file:
191            self.cert_pwd = config.get("experiment_control", "cert_pwd")
192        else:
193            self.cert_file = config.get("globals", "cert_file")
194            self.cert_pwd = config.get("globals", "cert_pwd")
195
196        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
197                or config.get("globals", "trusted_certs")
198
199        self.exp_stem = "fed-stem"
200        self.log = logging.getLogger("fedd.experiment_control")
201        set_log_level(config, "experiment_control", self.log)
202        self.muxmax = 2
203        self.nthreads = 2
204        self.randomize_experiments = False
205
206        self.scp_exec = "/usr/bin/scp"
207        self.splitter = None
208        self.ssh_exec="/usr/bin/ssh"
209        self.ssh_keygen = "/usr/bin/ssh-keygen"
210        self.ssh_identity_file = None
211
212
213        self.debug = config.getboolean("experiment_control", "create_debug")
214        self.state_filename = config.get("experiment_control", 
215                "experiment_state")
216        self.splitter_url = config.get("experiment_control", "splitter_uri")
217        self.fedkit = parse_tarfile_list(\
218                config.get("experiment_control", "fedkit"))
219        self.gatewaykit = parse_tarfile_list(\
220                config.get("experiment_control", "gatewaykit"))
221        accessdb_file = config.get("experiment_control", "accessdb")
222
223        self.ssh_pubkey_file = config.get("experiment_control", 
224                "ssh_pubkey_file")
225        self.ssh_privkey_file = config.get("experiment_control",
226                "ssh_privkey_file")
227        # NB for internal master/slave ops, not experiment setup
228        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
229        self.state = { }
230        self.state_lock = Lock()
231        self.tclsh = "/usr/local/bin/otclsh"
232        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
233                config.get("experiment_control", "tcl_splitter",
234                        "/usr/testbed/lib/ns2ir/parse.tcl")
235        mapdb_file = config.get("experiment_control", "mapdb")
236        self.trace_file = sys.stderr
237
238        self.def_expstart = \
239                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
240                "/tmp/federate";
241        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
242                "FEDDIR/hosts";
243        self.def_gwstart = \
244                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
245                "/tmp/bridge.log";
246        self.def_mgwstart = \
247                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
248                "/tmp/bridge.log";
249        self.def_gwimage = "FBSD61-TUNNEL2";
250        self.def_gwtype = "pc";
251        self.local_access = { }
252
253        if auth:
254            self.auth = auth
255        else:
256            self.log.error(\
257                    "[access]: No authorizer initialized, creating local one.")
258            auth = authorizer()
259
260
261        if self.ssh_pubkey_file:
262            try:
263                f = open(self.ssh_pubkey_file, 'r')
264                self.ssh_pubkey = f.read()
265                f.close()
266            except IOError:
267                raise service_error(service_error.internal,
268                        "Cannot read sshpubkey")
269        else:
270            raise service_error(service_error.internal, 
271                    "No SSH public key file?")
272
273        if not self.ssh_privkey_file:
274            raise service_error(service_error.internal, 
275                    "No SSH public key file?")
276
277
278        if mapdb_file:
279            self.read_mapdb(mapdb_file)
280        else:
281            self.log.warn("[experiment_control] No testbed map, using defaults")
282            self.tbmap = { 
283                    'deter':'https://users.isi.deterlab.net:23235',
284                    'emulab':'https://users.isi.deterlab.net:23236',
285                    'ucb':'https://users.isi.deterlab.net:23237',
286                    }
287
288        if accessdb_file:
289                self.read_accessdb(accessdb_file)
290        else:
291            raise service_error(service_error.internal,
292                    "No accessdb specified in config")
293
294        # Grab saved state.  OK to do this w/o locking because it's read only
295        # and only one thread should be in existence that can see self.state at
296        # this point.
297        if self.state_filename:
298            self.read_state()
299
300        # Dispatch tables
301        self.soap_services = {\
302                'Create': soap_handler('Create', self.create_experiment),
303                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
304                'Vis': soap_handler('Vis', self.get_vis),
305                'Info': soap_handler('Info', self.get_info),
306                'Terminate': soap_handler('Terminate', 
307                    self.terminate_experiment),
308        }
309
310        self.xmlrpc_services = {\
311                'Create': xmlrpc_handler('Create', self.create_experiment),
312                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
313                'Vis': xmlrpc_handler('Vis', self.get_vis),
314                'Info': xmlrpc_handler('Info', self.get_info),
315                'Terminate': xmlrpc_handler('Terminate',
316                    self.terminate_experiment),
317        }
318
319    def copy_file(self, src, dest, size=1024):
320        """
321        Exceedingly simple file copy.
322        """
323        s = open(src,'r')
324        d = open(dest, 'w')
325
326        buf = "x"
327        while buf != "":
328            buf = s.read(size)
329            d.write(buf)
330        s.close()
331        d.close()
332
333    # Call while holding self.state_lock
334    def write_state(self):
335        """
336        Write a new copy of experiment state after copying the existing state
337        to a backup.
338
339        State format is a simple pickling of the state dictionary.
340        """
341        if os.access(self.state_filename, os.W_OK):
342            self.copy_file(self.state_filename, \
343                    "%s.bak" % self.state_filename)
344        try:
345            f = open(self.state_filename, 'w')
346            pickle.dump(self.state, f)
347        except IOError, e:
348            self.log.error("Can't write file %s: %s" % \
349                    (self.state_filename, e))
350        except pickle.PicklingError, e:
351            self.log.error("Pickling problem: %s" % e)
352        except TypeError, e:
353            self.log.error("Pickling problem (TypeError): %s" % e)
354
355    # Call while holding self.state_lock
356    def read_state(self):
357        """
358        Read a new copy of experiment state.  Old state is overwritten.
359
360        State format is a simple pickling of the state dictionary.
361        """
362        try:
363            f = open(self.state_filename, "r")
364            self.state = pickle.load(f)
365            self.log.debug("[read_state]: Read state from %s" % \
366                    self.state_filename)
367        except IOError, e:
368            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
369                    % (self.state_filename, e))
370        except pickle.UnpicklingError, e:
371            self.log.warning(("[read_state]: No saved state: " + \
372                    "Unpickling failed: %s") % e)
373       
374        for k in self.state.keys():
375            try:
376                # This list should only have one element in it, but phrasing it
377                # as a for loop doesn't cost much, really.  We have to find the
378                # fedid elements anyway.
379                for eid in [ f['fedid'] \
380                        for f in self.state[k]['experimentID']\
381                            if f.has_key('fedid') ]:
382                    self.auth.set_attribute(self.state[k]['owner'], eid)
383            except KeyError, e:
384                self.log.warning("[read_state]: State ownership or identity " +\
385                        "misformatted in %s: %s" % (self.state_filename, e))
386
387
388    def read_accessdb(self, accessdb_file):
389        """
390        Read the mapping from fedids that can create experiments to their name
391        in the 3-level access namespace.  All will be asserted from this
392        testbed and can include the local username and porject that will be
393        asserted on their behalf by this fedd.  Each fedid is also added to the
394        authorization system with the "create" attribute.
395        """
396        self.accessdb = {}
397        # These are the regexps for parsing the db
398        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
399        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
400                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
401        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
402                "\s*->\s*(" + name_expr + ")\s*$")
403        lineno = 0
404
405        # Parse the mappings and store in self.authdb, a dict of
406        # fedid -> (proj, user)
407        try:
408            f = open(accessdb_file, "r")
409            for line in f:
410                lineno += 1
411                line = line.strip()
412                if len(line) == 0 or line.startswith('#'):
413                    continue
414                m = project_line.match(line)
415                if m:
416                    fid = fedid(hexstr=m.group(1))
417                    project, user = m.group(2,3)
418                    if not self.accessdb.has_key(fid):
419                        self.accessdb[fid] = []
420                    self.accessdb[fid].append((project, user))
421                    continue
422
423                m = user_line.match(line)
424                if m:
425                    fid = fedid(hexstr=m.group(1))
426                    project = None
427                    user = m.group(2)
428                    if not self.accessdb.has_key(fid):
429                        self.accessdb[fid] = []
430                    self.accessdb[fid].append((project, user))
431                    continue
432                self.log.warn("[experiment_control] Error parsing access " +\
433                        "db %s at line %d" %  (accessdb_file, lineno))
434        except IOError:
435            raise service_error(service_error.internal,
436                    "Error opening/reading %s as experiment " +\
437                            "control accessdb" %  accessdb_file)
438        f.close()
439
440        # Initialize the authorization attributes
441        for fid in self.accessdb.keys():
442            self.auth.set_attribute(fid, 'create')
443
444    def read_mapdb(self, file):
445        """
446        Read a simple colon separated list of mappings for the
447        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
448        """
449
450        self.tbmap = { }
451        lineno =0
452        try:
453            f = open(file, "r")
454            for line in f:
455                lineno += 1
456                line = line.strip()
457                if line.startswith('#') or len(line) == 0:
458                    continue
459                try:
460                    label, url = line.split(':', 1)
461                    self.tbmap[label] = url
462                except ValueError, e:
463                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
464                            "map db: %s %s" % (lineno, line, e))
465        except IOError, e:
466            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
467                    "open %s: %s" % (file, e))
468        f.close()
469
470    def scp_file(self, file, user, host, dest=""):
471        """
472        scp a file to the remote host.  If debug is set the action is only
473        logged.
474        """
475
476        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
477                '-o', 'StrictHostKeyChecking yes', '-i', 
478                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
479        rv = 0
480
481        try:
482            dnull = open("/dev/null", "w")
483        except IOError:
484            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
485            dnull = Null
486
487        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
488        if not self.debug:
489            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
490
491        return rv == 0
492
493    def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
494        """
495        Run a remote command on host as user.  If debug is set, the action is
496        only logged.
497        """
498        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
499                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
500                (self.ssh_exec, self.ssh_privkey_file, 
501                        user, host, cmd)
502
503        try:
504            dnull = open("/dev/null", "r")
505        except IOError:
506            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
507            dnull = Null
508
509        self.log.debug("[ssh_cmd]: %s" % sh_str)
510        if not self.debug:
511            if dnull:
512                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
513            else:
514                sub = Popen(sh_str, shell=True)
515            if timeout:
516                i = 0
517                rv = sub.poll()
518                while i < timeout:
519                    if rv is not None: break
520                    else:
521                        time.sleep(1)
522                        rv = sub.poll()
523                        i += 1
524                else:
525                    self.log.debug("Process exceeded runtime: %s" % sh_str)
526                    os.kill(sub.pid, signal.SIGKILL)
527                    raise self.ssh_cmd_timeout();
528                return rv == 0
529            else:
530                return sub.wait() == 0
531        else:
532            return True
533
534
535    def create_config_tree(self, src_dir, dest_dir, script):
536        """
537        Append commands to script that will create the directory hierarchy on
538        the remote federant.
539        """
540
541        if os.path.isdir(src_dir):
542            print >>script, "mkdir -p %s" % dest_dir
543            print >>script, "chmod 770 %s" % dest_dir
544
545            for f in os.listdir(src_dir):
546                if os.path.isdir(f):
547                    self.create_config_tree("%s/%s" % (src_dir, f), 
548                            "%s/%s" % (dest_dir, f), script)
549        else:
550            self.log.debug("[create_config_tree]: Not a directory: %s" \
551                    % src_dir)
552
553    def ship_configs(self, host, user, src_dir, dest_dir):
554        """
555        Copy federant-specific configuration files to the federant.
556        """
557        for f in os.listdir(src_dir):
558            if os.path.isdir(f):
559                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
560                        "%s/%s" % (dest_dir, f)):
561                    return False
562            else:
563                if not self.scp_file("%s/%s" % (src_dir, f), 
564                        user, host, dest_dir):
565                    return False
566        return True
567
568    def get_state(self, user, host, ssh_key, tb, pid, eid):
569        # command to test experiment state
570        expinfo_exec = "/usr/testbed/bin/expinfo" 
571        # Regular expressions to parse the expinfo response
572        state_re = re.compile("State:\s+(\w+)")
573        no_exp_re = re.compile("^No\s+such\s+experiment")
574        swapping_re = re.compile("^No\s+information\s+available.")
575        state = None    # Experiment state parsed from expinfo
576        # The expinfo ssh command.  Note the identity restriction to use only
577        # the identity provided in the pubkey given.
578        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
579                'StrictHostKeyChecking yes', '-i', 
580                ssh_key, "%s@%s" % (user, host), 
581                expinfo_exec, pid, eid]
582
583        dev_null = None
584        try:
585            dev_null = open("/dev/null", "a")
586        except IOError, e:
587            self.log.error("[get_state]: can't open /dev/null: %s" %e)
588
589        if self.debug:
590            state = 'swapped'
591            rv = 0
592        else:
593            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
594            for line in status.stdout:
595                m = state_re.match(line)
596                if m: state = m.group(1)
597                else:
598                    for reg, st in ((no_exp_re, "none"),
599                            (swapping_re, "swapping")):
600                        m = reg.match(line)
601                        if m: state = st
602            rv = status.wait()
603
604        # If the experiment is not present the subcommand returns a non-zero
605        # return value.  If we successfully parsed a "none" outcome, ignore the
606        # return code.
607        if rv != 0 and state != 'none':
608            raise service_error(service_error.internal,
609                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
610        elif state not in ('active', 'swapped', 'swapping', 'none'):
611            raise service_error(service_error.internal,
612                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
613        else: return state
614
615
616    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
617        """
618        Start a sub-experiment on a federant.
619
620        Get the current state, modify or create as appropriate, ship data and
621        configs and start the experiment.  There are small ordering differences
622        based on the initial state of the sub-experiment.
623        """
624        # ops node in the federant
625        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
626        user = tbparams[tb]['user']     # federant user
627        pid = tbparams[tb]['project']   # federant project
628        # XXX
629        base_confs = ( "hosts",)
630        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
631        # Configuration directories on the remote machine
632        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
633        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
634        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
635
636        state = self.get_state(user, host, self.ssh_privkey_file, tb, pid, eid)
637
638        self.log.debug("[start_segment]: %s: %s" % (tb, state))
639        self.log.info("[start_segment]:transferring experiment to %s" % tb)
640
641        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
642            return False
643       
644        if state == 'none':
645            # Create a null copy of the experiment so that we capture any logs
646            # there if the modify fails.  Emulab software discards the logs
647            # from a failed startexp
648            if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
649                return False
650            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
651            timedout = False
652            try:
653                if not self.ssh_cmd(user, host,
654                        ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
655                        "-e %s null.tcl") % (pid, eid), "startexp",
656                        timeout=60 * 10):
657                    return False
658            except self.ssh_cmd_timeout:
659                timedout = True
660
661            if timedout:
662                state = self.get_state(user, host, self.ssh_privkey_file, 
663                        tb, eid, pid)
664                if state != "swapped":
665                    return False
666
667       
668        # Open up a temporary file to contain a script for setting up the
669        # filespace for the new experiment.
670        self.log.info("[start_segment]: creating script file")
671        try:
672            sf, scriptname = tempfile.mkstemp()
673            scriptfile = os.fdopen(sf, 'w')
674        except IOError:
675            return False
676
677        scriptbase = os.path.basename(scriptname)
678
679        # Script the filesystem changes
680        print >>scriptfile, "/bin/rm -rf %s" % proj_dir
681        # Clear and create the tarfiles and rpm directories
682        for d in (tarfiles_dir, rpms_dir):
683            print >>scriptfile, "/bin/rm -rf %s/*" % d
684            print >>scriptfile, "mkdir -p %s" % d
685        print >>scriptfile, 'mkdir -p %s' % proj_dir
686        self.create_config_tree("%s/%s" % (tmpdir, tb), proj_dir, scriptfile)
687        if os.path.isdir("%s/tarfiles" % tmpdir):
688            self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
689                    scriptfile)
690        if os.path.isdir("%s/rpms" % tmpdir):
691            self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
692                    scriptfile)
693        print >>scriptfile, "rm -f %s" % scriptbase
694        scriptfile.close()
695
696        # Move the script to the remote machine
697        # XXX: could collide tempfile names on the remote host
698        if self.scp_file(scriptname, user, host, scriptbase):
699            os.remove(scriptname)
700        else:
701            return False
702
703        # Execute the script (and the script's last line deletes it)
704        if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
705            return False
706
707        for f in base_confs:
708            if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
709                    "%s/%s" % (proj_dir, f)):
710                return False
711        if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
712                proj_dir):
713            return False
714        if os.path.isdir("%s/tarfiles" % tmpdir):
715            if not self.ship_configs(host, user,
716                    "%s/tarfiles" % tmpdir, tarfiles_dir):
717                return False
718        if os.path.isdir("%s/rpms" % tmpdir):
719            if not self.ship_configs(host, user,
720                    "%s/rpms" % tmpdir, tarfiles_dir):
721                return False
722        # Stage the new configuration (active experiments will stay swapped in
723        # now)
724        self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
725        try:
726            if not self.ssh_cmd(user, host,
727                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
728                            (pid, eid, tclfile),
729                    "modexp", timeout= 60 * 10):
730                return False
731        except self.ssh_cmd_timeout:
732            print "modexp timeout"
733            # There's really no way to see if this succeeded or failed, so if
734            # it hangs, assume the worst.
735            return False
736        # Active experiments are still swapped, this swaps the others in.
737        if state != 'active':
738            self.log.info("[start_segment]: Swapping %s in on %s" % \
739                    (eid, tb))
740            timedout = False
741            try:
742                if not self.ssh_cmd(user, host,
743                        "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
744                        "swapexp", timeout=10*60):
745                    return False
746            except self.ssh_cmd_timeout:
747                timedout = True
748           
749            # If the command was terminated, but completed successfully, report
750            # success.
751            if timedout:
752                state = self.get_state(user, host, self.ssh_privkey_file,
753                        tb, eid, pid)
754                self.log.debug("[start_segment]: swapin timed out (state)")
755                return state == 'active'
756        # Everything has gone OK.
757        return True
758
759    def stop_segment(self, tb, eid, tbparams):
760        """
761        Stop a sub experiment by calling swapexp on the federant
762        """
763        user = tbparams[tb]['user']
764        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
765        pid = tbparams[tb]['project']
766
767        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
768        return self.ssh_cmd(user, host,
769                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
770
771       
772    def generate_ssh_keys(self, dest, type="rsa" ):
773        """
774        Generate a set of keys for the gateways to use to talk.
775
776        Keys are of type type and are stored in the required dest file.
777        """
778        valid_types = ("rsa", "dsa")
779        t = type.lower();
780        if t not in valid_types: raise ValueError
781        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
782
783        try:
784            trace = open("/dev/null", "w")
785        except IOError:
786            raise service_error(service_error.internal,
787                    "Cannot open /dev/null??");
788
789        # May raise CalledProcessError
790        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
791        rv = call(cmd, stdout=trace, stderr=trace)
792        if rv != 0:
793            raise service_error(service_error.internal, 
794                    "Cannot generate nonce ssh keys.  %s return code %d" \
795                            % (self.ssh_keygen, rv))
796
797    def gentopo(self, str):
798        """
799        Generate the topology dtat structure from the splitter's XML
800        representation of it.
801
802        The topology XML looks like:
803            <experiment>
804                <nodes>
805                    <node><vname></vname><ips>ip1:ip2</ips></node>
806                </nodes>
807                <lans>
808                    <lan>
809                        <vname></vname><vnode></vnode><ip></ip>
810                        <bandwidth></bandwidth><member>node:port</member>
811                    </lan>
812                </lans>
813        """
814        class topo_parse:
815            """
816            Parse the topology XML and create the dats structure.
817            """
818            def __init__(self):
819                # Typing of the subelements for data conversion
820                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
821                self.int_subelements = ( 'bandwidth',)
822                self.float_subelements = ( 'delay',)
823                # The final data structure
824                self.nodes = [ ]
825                self.lans =  [ ]
826                self.topo = { \
827                        'node': self.nodes,\
828                        'lan' : self.lans,\
829                    }
830                self.element = { }  # Current element being created
831                self.chars = ""     # Last text seen
832
833            def end_element(self, name):
834                # After each sub element the contents is added to the current
835                # element or to the appropriate list.
836                if name == 'node':
837                    self.nodes.append(self.element)
838                    self.element = { }
839                elif name == 'lan':
840                    self.lans.append(self.element)
841                    self.element = { }
842                elif name in self.str_subelements:
843                    self.element[name] = self.chars
844                    self.chars = ""
845                elif name in self.int_subelements:
846                    self.element[name] = int(self.chars)
847                    self.chars = ""
848                elif name in self.float_subelements:
849                    self.element[name] = float(self.chars)
850                    self.chars = ""
851
852            def found_chars(self, data):
853                self.chars += data.rstrip()
854
855
856        tp = topo_parse();
857        parser = xml.parsers.expat.ParserCreate()
858        parser.EndElementHandler = tp.end_element
859        parser.CharacterDataHandler = tp.found_chars
860
861        parser.Parse(str)
862
863        return tp.topo
864       
865
866    def genviz(self, topo):
867        """
868        Generate the visualization the virtual topology
869        """
870
871        neato = "/usr/local/bin/neato"
872        # These are used to parse neato output and to create the visualization
873        # file.
874        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
875        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
876                "%s</type></node>"
877
878        try:
879            # Node names
880            nodes = [ n['vname'] for n in topo['node'] ]
881            topo_lans = topo['lan']
882        except KeyError:
883            raise service_error(service_error.internal, "Bad topology")
884
885        lans = { }
886        links = { }
887
888        # Walk through the virtual topology, organizing the connections into
889        # 2-node connections (links) and more-than-2-node connections (lans).
890        # When a lan is created, it's added to the list of nodes (there's a
891        # node in the visualization for the lan).
892        for l in topo_lans:
893            if links.has_key(l['vname']):
894                if len(links[l['vname']]) < 2:
895                    links[l['vname']].append(l['vnode'])
896                else:
897                    nodes.append(l['vname'])
898                    lans[l['vname']] = links[l['vname']]
899                    del links[l['vname']]
900                    lans[l['vname']].append(l['vnode'])
901            elif lans.has_key(l['vname']):
902                lans[l['vname']].append(l['vnode'])
903            else:
904                links[l['vname']] = [ l['vnode'] ]
905
906
907        # Open up a temporary file for dot to turn into a visualization
908        try:
909            df, dotname = tempfile.mkstemp()
910            dotfile = os.fdopen(df, 'w')
911        except IOError:
912            raise service_error(service_error.internal,
913                    "Failed to open file in genviz")
914
915        # Generate a dot/neato input file from the links, nodes and lans
916        try:
917            print >>dotfile, "graph G {"
918            for n in nodes:
919                print >>dotfile, '\t"%s"' % n
920            for l in links.keys():
921                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
922            for l in lans.keys():
923                for n in lans[l]:
924                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
925            print >>dotfile, "}"
926            dotfile.close()
927        except TypeError:
928            raise service_error(service_error.internal,
929                    "Single endpoint link in vtopo")
930        except IOError:
931            raise service_error(service_error.internal, "Cannot write dot file")
932
933        # Use dot to create a visualization
934        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
935                '-Gpack=true', dotname], stdout=PIPE)
936
937        # Translate dot to vis format
938        vis_nodes = [ ]
939        vis = { 'node': vis_nodes }
940        for line in dot.stdout:
941            m = vis_re.match(line)
942            if m:
943                vn = m.group(1)
944                vis_node = {'name': vn, \
945                        'x': float(m.group(2)),\
946                        'y' : float(m.group(3)),\
947                    }
948                if vn in links.keys() or vn in lans.keys():
949                    vis_node['type'] = 'lan'
950                else:
951                    vis_node['type'] = 'node'
952                vis_nodes.append(vis_node)
953        rv = dot.wait()
954
955        os.remove(dotname)
956        if rv == 0 : return vis
957        else: return None
958
959    def get_access(self, tb, nodes, user, tbparam, master, export_project,
960            access_user):
961        """
962        Get access to testbed through fedd and set the parameters for that tb
963        """
964        uri = self.tbmap.get(tb, None)
965        if not uri:
966            raise service_error(serice_error.server_config, 
967                    "Unknown testbed: %s" % tb)
968
969        # currently this lumps all users into one service access group
970        service_keys = [ a for u in user \
971                for a in u.get('access', []) \
972                    if a.has_key('sshPubkey')]
973
974        if len(service_keys) == 0:
975            raise service_error(service_error.req, 
976                    "Must have at least one SSH pubkey for services")
977
978
979        for p, u in access_user:
980            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
981                    "to %s") %  ((p or "None"), u, uri))
982
983            if p:
984                # Request with user and project specified
985                req = {\
986                        'destinationTestbed' : { 'uri' : uri },
987                        'project': { 
988                            'name': {'localname': p},
989                            'user': [ {'userID': { 'localname': u } } ],
990                            },
991                        'user':  user,
992                        'allocID' : { 'localname': 'test' },
993                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
994                        'serviceAccess' : service_keys
995                    }
996            else:
997                # Request with only user specified
998                req = {\
999                        'destinationTestbed' : { 'uri' : uri },
1000                        'user':  [ {'userID': { 'localname': u } } ],
1001                        'allocID' : { 'localname': 'test' },
1002                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1003                        'serviceAccess' : service_keys
1004                    }
1005
1006            if tb == master:
1007                # NB, the export_project parameter is a dict that includes
1008                # the type
1009                req['exportProject'] = export_project
1010
1011            # node resources if any
1012            if nodes != None and len(nodes) > 0:
1013                rnodes = [ ]
1014                for n in nodes:
1015                    rn = { }
1016                    image, hw, count = n.split(":")
1017                    if image: rn['image'] = [ image ]
1018                    if hw: rn['hardware'] = [ hw ]
1019                    if count and int(count) >0 : rn['count'] = int(count)
1020                    rnodes.append(rn)
1021                req['resources']= { }
1022                req['resources']['node'] = rnodes
1023
1024            try:
1025                if self.local_access.has_key(uri):
1026                    # Local access call
1027                    req = { 'RequestAccessRequestBody' : req }
1028                    r = self.local_access[uri].RequestAccess(req, 
1029                            fedid(file=self.cert_file))
1030                    r = { 'RequestAccessResponseBody' : r }
1031                else:
1032                    r = self.call_RequestAccess(uri, req, 
1033                            self.cert_file, self.cert_pwd, self.trusted_certs)
1034            except service_error, e:
1035                if e.code == service_error.access:
1036                    self.log.debug("[get_access] Access denied")
1037                    r = None
1038                    continue
1039                else:
1040                    raise e
1041
1042            if r.has_key('RequestAccessResponseBody'):
1043                # Through to here we have a valid response, not a fault.
1044                # Access denied is a fault, so something better or worse than
1045                # access denied has happened.
1046                r = r['RequestAccessResponseBody']
1047                self.log.debug("[get_access] Access granted")
1048                break
1049            else:
1050                raise service_error(service_error.protocol,
1051                        "Bad proxy response")
1052       
1053        if not r:
1054            raise service_error(service_error.access, 
1055                    "Access denied by %s (%s)" % (tb, uri))
1056
1057        e = r['emulab']
1058        p = e['project']
1059        tbparam[tb] = { 
1060                "boss": e['boss'],
1061                "host": e['ops'],
1062                "domain": e['domain'],
1063                "fs": e['fileServer'],
1064                "eventserver": e['eventServer'],
1065                "project": unpack_id(p['name']),
1066                "emulab" : e,
1067                "allocID" : r['allocID'],
1068                }
1069        # Make the testbed name be the label the user applied
1070        p['testbed'] = {'localname': tb }
1071
1072        for u in p['user']:
1073            role = u.get('role', None)
1074            if role == 'experimentCreation':
1075                tbparam[tb]['user'] = unpack_id(u['userID'])
1076                break
1077        else:
1078            raise service_error(service_error.internal, 
1079                    "No createExperimentUser from %s" %tb)
1080
1081        # Add attributes to barameter space.  We don't allow attributes to
1082        # overlay any parameters already installed.
1083        for a in e['fedAttr']:
1084            try:
1085                if a['attribute'] and isinstance(a['attribute'], basestring)\
1086                        and not tbparam[tb].has_key(a['attribute'].lower()):
1087                    tbparam[tb][a['attribute'].lower()] = a['value']
1088            except KeyError:
1089                self.log.error("Bad attribute in response: %s" % a)
1090       
1091    def release_access(self, tb, aid):
1092        """
1093        Release access to testbed through fedd
1094        """
1095
1096        uri = self.tbmap.get(tb, None)
1097        if not uri:
1098            raise service_error(serice_error.server_config, 
1099                    "Unknown testbed: %s" % tb)
1100
1101        if self.local_access.has_key(uri):
1102            resp = self.local_access[uri].ReleaseAccess(\
1103                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1104                    fedid(file=self.cert_file))
1105            resp = { 'ReleaseAccessResponseBody': resp } 
1106        else:
1107            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1108                    self.cert_file, self.cert_pwd, self.trusted_certs)
1109
1110        # better error coding
1111
1112    def remote_splitter(self, uri, desc, master):
1113
1114        req = {
1115                'description' : { 'ns2description': desc },
1116                'master': master,
1117                'include_fedkit': bool(self.fedkit),
1118                'include_gatewaykit': bool(self.gatewaykit)
1119            }
1120
1121        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1122                self.trusted_certs)
1123
1124        if r.has_key('Ns2SplitResponseBody'):
1125            r = r['Ns2SplitResponseBody']
1126            if r.has_key('output'):
1127                return r['output'].splitlines()
1128            else:
1129                raise service_error(service_error.protocol, 
1130                        "Bad splitter response (no output)")
1131        else:
1132            raise service_error(service_error.protocol, "Bad splitter response")
1133       
1134    class current_testbed:
1135        """
1136        Object for collecting the current testbed description.  The testbed
1137        description is saved to a file with the local testbed variables
1138        subsittuted line by line.
1139        """
1140        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1141            def tar_list_to_string(tl):
1142                if tl is None: return None
1143
1144                rv = ""
1145                for t in tl:
1146                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1147                            (t[0], os.path.basename(t[1]))
1148                return rv
1149
1150
1151            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1152            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1153            self.current_testbed = None
1154            self.testbed_file = None
1155
1156            self.def_expstart = \
1157                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1158            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1159            self.def_gwstart = \
1160                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1161            self.def_mgwstart = \
1162                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1163            self.def_gwimage = "FBSD61-TUNNEL2";
1164            self.def_gwtype = "pc";
1165            self.def_mgwcmd = '# '
1166            self.def_mgwcmdparams = ''
1167            self.def_gwcmd = '# '
1168            self.def_gwcmdparams = ''
1169
1170            self.eid = eid
1171            self.tmpdir = tmpdir
1172            # Convert fedkit and gateway kit (which are lists of tuples) into a
1173            # substituition string.
1174            self.fedkit = tar_list_to_string(fedkit)
1175            self.gatewaykit = tar_list_to_string(gatewaykit)
1176
1177        def __call__(self, line, master, allocated, tbparams):
1178            # Capture testbed topology descriptions
1179            if self.current_testbed == None:
1180                m = self.begin_testbed.match(line)
1181                if m != None:
1182                    self.current_testbed = m.group(1)
1183                    if self.current_testbed == None:
1184                        raise service_error(service_error.req,
1185                                "Bad request format (unnamed testbed)")
1186                    allocated[self.current_testbed] = \
1187                            allocated.get(self.current_testbed,0) + 1
1188                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1189                    if not os.path.exists(tb_dir):
1190                        try:
1191                            os.mkdir(tb_dir)
1192                        except IOError:
1193                            raise service_error(service_error.internal,
1194                                    "Cannot create %s" % tb_dir)
1195                    try:
1196                        self.testbed_file = open("%s/%s.%s.tcl" %
1197                                (tb_dir, self.eid, self.current_testbed), 'w')
1198                    except IOError:
1199                        self.testbed_file = None
1200                    return True
1201                else: return False
1202            else:
1203                m = self.end_testbed.match(line)
1204                if m != None:
1205                    if m.group(1) != self.current_testbed:
1206                        raise service_error(service_error.internal, 
1207                                "Mismatched testbed markers!?")
1208                    if self.testbed_file != None: 
1209                        self.testbed_file.close()
1210                        self.testbed_file = None
1211                    self.current_testbed = None
1212                elif self.testbed_file:
1213                    # Substitute variables and put the line into the local
1214                    # testbed file.
1215                    gwtype = tbparams[self.current_testbed].get(\
1216                            'connectortype', self.def_gwtype)
1217                    gwimage = tbparams[self.current_testbed].get(\
1218                            'connectorimage', self.def_gwimage)
1219                    mgwstart = tbparams[self.current_testbed].get(\
1220                            'masterconnectorstartcmd', self.def_mgwstart)
1221                    mexpstart = tbparams[self.current_testbed].get(\
1222                            'masternodestartcmd', self.def_mexpstart)
1223                    gwstart = tbparams[self.current_testbed].get(\
1224                            'slaveconnectorstartcmd', self.def_gwstart)
1225                    expstart = tbparams[self.current_testbed].get(\
1226                            'slavenodestartcmd', self.def_expstart)
1227                    project = tbparams[self.current_testbed].get('project')
1228                    gwcmd = tbparams[self.current_testbed].get(\
1229                            'slaveconnectorcmd', self.def_gwcmd)
1230                    gwcmdparams = tbparams[self.current_testbed].get(\
1231                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1232                    mgwcmd = tbparams[self.current_testbed].get(\
1233                            'masterconnectorcmd', self.def_gwcmd)
1234                    mgwcmdparams = tbparams[self.current_testbed].get(\
1235                            'masterconnectorcmdparams', self.def_gwcmdparams)
1236                    line = re.sub("GWTYPE", gwtype, line)
1237                    line = re.sub("GWIMAGE", gwimage, line)
1238                    if self.current_testbed == master:
1239                        line = re.sub("GWSTART", mgwstart, line)
1240                        line = re.sub("EXPSTART", mexpstart, line)
1241                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1242                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1243                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1244                    else:
1245                        line = re.sub("GWSTART", gwstart, line)
1246                        line = re.sub("EXPSTART", expstart, line)
1247                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1248                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1249                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1250                    #These expansions contain EID and PROJDIR.  NB these are
1251                    # local fedkit and gatewaykit, which are strings.
1252                    if self.fedkit:
1253                        line = re.sub("FEDKIT", self.fedkit, line)
1254                    if self.gatewaykit:
1255                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1256                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1257                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1258                    line = re.sub("EID", self.eid, line)
1259                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1260                            (project, self.eid), line)
1261                    print >>self.testbed_file, line
1262                return True
1263
1264    class allbeds:
1265        """
1266        Process the Allbeds section.  Get access to each federant and save the
1267        parameters in tbparams
1268        """
1269        def __init__(self, get_access):
1270            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1271            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1272            self.in_allbeds = False
1273            self.get_access = get_access
1274
1275        def __call__(self, line, user, tbparams, master, export_project,
1276                access_user):
1277            # Testbed access parameters
1278            if not self.in_allbeds:
1279                if self.begin_allbeds.match(line):
1280                    self.in_allbeds = True
1281                    return True
1282                else:
1283                    return False
1284            else:
1285                if self.end_allbeds.match(line):
1286                    self.in_allbeds = False
1287                else:
1288                    nodes = line.split('|')
1289                    tb = nodes.pop(0)
1290                    self.get_access(tb, nodes, user, tbparams, master,
1291                            export_project, access_user)
1292                return True
1293
1294    class gateways:
1295        def __init__(self, eid, master, tmpdir, gw_pubkey,
1296                gw_secretkey, copy_file, fedkit):
1297            self.begin_gateways = \
1298                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1299            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1300            self.current_gateways = None
1301            self.control_gateway = None
1302            self.active_end = { }
1303
1304            self.eid = eid
1305            self.master = master
1306            self.tmpdir = tmpdir
1307            self.gw_pubkey_base = gw_pubkey
1308            self.gw_secretkey_base = gw_secretkey
1309
1310            self.copy_file = copy_file
1311            self.fedkit = fedkit
1312
1313
1314        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1315                active_end, tbparams, dtb, myname, desthost, type):
1316            """
1317            Produce a gateway configuration file from a gateways line.
1318            """
1319
1320            sproject = tbparams[gw].get('project', 'project')
1321            dproject = tbparams[dtb].get('project', 'project')
1322            sdomain = ".%s.%s%s" % (eid, sproject,
1323                    tbparams[gw].get('domain', ".example.com"))
1324            ddomain = ".%s.%s%s" % (eid, dproject,
1325                    tbparams[dtb].get('domain', ".example.com"))
1326            boss = tbparams[master].get('boss', "boss")
1327            fs = tbparams[master].get('fs', "fs")
1328            event_server = "%s%s" % \
1329                    (tbparams[gw].get('eventserver', "event_server"),
1330                            tbparams[gw].get('domain', "example.com"))
1331            remote_event_server = "%s%s" % \
1332                    (tbparams[dtb].get('eventserver', "event_server"),
1333                            tbparams[dtb].get('domain', "example.com"))
1334            seer_control = "%s%s" % \
1335                    (tbparams[gw].get('control', "control"), sdomain)
1336            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1337
1338            if self.fedkit:
1339                remote_script_dir = "/usr/local/federation/bin"
1340                local_script_dir = "/usr/local/federation/bin"
1341            else:
1342                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1343                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1344
1345            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1346            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1347            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1348
1349            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1350            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1351
1352            # translate to lower case so the `hostname` hack for specifying
1353            # configuration files works.
1354            conf_file = conf_file.lower();
1355            remote_conf_file = remote_conf_file.lower();
1356
1357            if dtb == master:
1358                active = "false"
1359            elif gw == master:
1360                active = "true"
1361            elif active_end.has_key('%s-%s' % (dtb, gw)):
1362                active = "false"
1363            else:
1364                active_end['%s-%s' % (gw, dtb)] = 1
1365                active = "true"
1366
1367            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1368            print >>gwconfig, "Active: %s" % active
1369            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1370            if tunnel_iface:
1371                print >>gwconfig, "Interface: %s" % tunnel_iface
1372            print >>gwconfig, "BossName: %s" % boss
1373            print >>gwconfig, "FsName: %s" % fs
1374            print >>gwconfig, "EventServerName: %s" % event_server
1375            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1376            print >>gwconfig, "SeerControl: %s" % seer_control
1377            print >>gwconfig, "Type: %s" % type
1378            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1379            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1380                    local_script_dir
1381            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1382            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1383            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1384                    (remote_conf_dir, remote_conf_file)
1385            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1386            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1387            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1388            gwconfig.close()
1389
1390            return active == "true"
1391
1392        def __call__(self, line, allocated, tbparams):
1393            # Process gateways
1394            if not self.current_gateways:
1395                m = self.begin_gateways.match(line)
1396                if m:
1397                    self.current_gateways = m.group(1)
1398                    if allocated.has_key(self.current_gateways):
1399                        # This test should always succeed
1400                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1401                        if not os.path.exists(tb_dir):
1402                            try:
1403                                os.mkdir(tb_dir)
1404                            except IOError:
1405                                raise service_error(service_error.internal,
1406                                        "Cannot create %s" % tb_dir)
1407                    else:
1408                        # XXX
1409                        self.log.error("[gateways]: Ignoring gateways for " + \
1410                                "unknown testbed %s" % self.current_gateways)
1411                        self.current_gateways = None
1412                    return True
1413                else:
1414                    return False
1415            else:
1416                m = self.end_gateways.match(line)
1417                if m :
1418                    if m.group(1) != self.current_gateways:
1419                        raise service_error(service_error.internal,
1420                                "Mismatched gateway markers!?")
1421                    if self.control_gateway:
1422                        try:
1423                            cc = open("%s/%s/client.conf" %
1424                                    (self.tmpdir, self.current_gateways), 'w')
1425                            print >>cc, "ControlGateway: %s" % \
1426                                    self.control_gateway
1427                            if tbparams[self.master].has_key('smbshare'):
1428                                print >>cc, "SMBSHare: %s" % \
1429                                        tbparams[self.master]['smbshare']
1430                            print >>cc, "ProjectUser: %s" % \
1431                                    tbparams[self.master]['user']
1432                            print >>cc, "ProjectName: %s" % \
1433                                    tbparams[self.master]['project']
1434                            print >>cc, "ExperimentID: %s/%s" % \
1435                                    ( tbparams[self.master]['project'], \
1436                                    self.eid )
1437                            cc.close()
1438                        except IOError:
1439                            raise service_error(service_error.internal,
1440                                    "Error creating client config")
1441                        # XXX: This seer specific file should disappear
1442                        try:
1443                            cc = open("%s/%s/seer.conf" %
1444                                    (self.tmpdir, self.current_gateways),
1445                                    'w')
1446                            if self.current_gateways != self.master:
1447                                print >>cc, "ControlNode: %s" % \
1448                                        self.control_gateway
1449                            print >>cc, "ExperimentID: %s/%s" % \
1450                                    ( tbparams[self.master]['project'], \
1451                                    self.eid )
1452                            cc.close()
1453                        except IOError:
1454                            raise service_error(service_error.internal,
1455                                    "Error creating seer config")
1456                    else:
1457                        debug.error("[gateways]: No control gateway for %s" %\
1458                                    self.current_gateways)
1459                    self.current_gateways = None
1460                else:
1461                    dtb, myname, desthost, type = line.split(" ")
1462
1463                    if type == "control" or type == "both":
1464                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1465                                self.eid, 
1466                                tbparams[self.current_gateways]['project'],
1467                                tbparams[self.current_gateways]['domain'])
1468                    try:
1469                        active = self.gateway_conf_file(self.current_gateways,
1470                                self.master, self.eid, self.gw_pubkey_base,
1471                                self.gw_secretkey_base,
1472                                self.active_end, tbparams, dtb, myname,
1473                                desthost, type)
1474                    except IOError, e:
1475                        raise service_error(service_error.internal,
1476                                "Failed to write config file for %s" % \
1477                                        self.current_gateway)
1478           
1479                    gw_pubkey = "%s/keys/%s" % \
1480                            (self.tmpdir, self.gw_pubkey_base)
1481                    gw_secretkey = "%s/keys/%s" % \
1482                            (self.tmpdir, self.gw_secretkey_base)
1483
1484                    pkfile = "%s/%s/%s" % \
1485                            ( self.tmpdir, self.current_gateways, 
1486                                    self.gw_pubkey_base)
1487                    skfile = "%s/%s/%s" % \
1488                            ( self.tmpdir, self.current_gateways, 
1489                                    self.gw_secretkey_base)
1490
1491                    if not os.path.exists(pkfile):
1492                        try:
1493                            self.copy_file(gw_pubkey, pkfile)
1494                        except IOError:
1495                            service_error(service_error.internal,
1496                                    "Failed to copy pubkey file")
1497
1498                    if active and not os.path.exists(skfile):
1499                        try:
1500                            self.copy_file(gw_secretkey, skfile)
1501                        except IOError:
1502                            service_error(service_error.internal,
1503                                    "Failed to copy secretkey file")
1504                return True
1505
1506    class shunt_to_file:
1507        """
1508        Simple class to write data between two regexps to a file.
1509        """
1510        def __init__(self, begin, end, filename):
1511            """
1512            Begin shunting on a match of begin, stop on end, send data to
1513            filename.
1514            """
1515            self.begin = re.compile(begin)
1516            self.end = re.compile(end)
1517            self.in_shunt = False
1518            self.file = None
1519            self.filename = filename
1520
1521        def __call__(self, line):
1522            """
1523            Call this on each line in the input that may be shunted.
1524            """
1525            if not self.in_shunt:
1526                if self.begin.match(line):
1527                    self.in_shunt = True
1528                    try:
1529                        self.file = open(self.filename, "w")
1530                    except:
1531                        self.file = None
1532                        raise
1533                    return True
1534                else:
1535                    return False
1536            else:
1537                if self.end.match(line):
1538                    if self.file: 
1539                        self.file.close()
1540                        self.file = None
1541                    self.in_shunt = False
1542                else:
1543                    if self.file:
1544                        print >>self.file, line
1545                return True
1546
1547    class shunt_to_list:
1548        """
1549        Same interface as shunt_to_file.  Data collected in self.list, one list
1550        element per line.
1551        """
1552        def __init__(self, begin, end):
1553            self.begin = re.compile(begin)
1554            self.end = re.compile(end)
1555            self.in_shunt = False
1556            self.list = [ ]
1557       
1558        def __call__(self, line):
1559            if not self.in_shunt:
1560                if self.begin.match(line):
1561                    self.in_shunt = True
1562                    return True
1563                else:
1564                    return False
1565            else:
1566                if self.end.match(line):
1567                    self.in_shunt = False
1568                else:
1569                    self.list.append(line)
1570                return True
1571
1572    class shunt_to_string:
1573        """
1574        Same interface as shunt_to_file.  Data collected in self.str, all in
1575        one string.
1576        """
1577        def __init__(self, begin, end):
1578            self.begin = re.compile(begin)
1579            self.end = re.compile(end)
1580            self.in_shunt = False
1581            self.str = ""
1582       
1583        def __call__(self, line):
1584            if not self.in_shunt:
1585                if self.begin.match(line):
1586                    self.in_shunt = True
1587                    return True
1588                else:
1589                    return False
1590            else:
1591                if self.end.match(line):
1592                    self.in_shunt = False
1593                else:
1594                    self.str += line
1595                return True
1596
1597    def create_experiment(self, req, fid):
1598        """
1599        The external interface to experiment creation called from the
1600        dispatcher.
1601
1602        Creates a working directory, splits the incoming description using the
1603        splitter script and parses out the avrious subsections using the
1604        lcasses above.  Once each sub-experiment is created, use pooled threads
1605        to instantiate them and start it all up.
1606        """
1607
1608        if not self.auth.check_attribute(fid, 'create'):
1609            raise service_error(service_error.access, "Create access denied")
1610
1611        try:
1612            tmpdir = tempfile.mkdtemp(prefix="split-")
1613        except IOError:
1614            raise service_error(service_error.internal, "Cannot create tmp dir")
1615
1616        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1617        gw_secretkey_base = "fed.%s" % self.ssh_type
1618        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1619        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1620        tclfile = tmpdir + "/experiment.tcl"
1621        tbparams = { }
1622        try:
1623            access_user = self.accessdb[fid]
1624        except KeyError:
1625            raise service_error(service_error.internal,
1626                    "Access map and authorizer out of sync in " + \
1627                            "create_experiment for fedid %s"  % fid)
1628
1629        pid = "dummy"
1630        gid = "dummy"
1631        # XXX
1632        fail_soft = False
1633
1634        try:
1635            os.mkdir(tmpdir+"/keys")
1636        except OSError:
1637            raise service_error(service_error.internal,
1638                    "Can't make temporary dir")
1639
1640        req = req.get('CreateRequestBody', None)
1641        if not req:
1642            raise service_error(service_error.req,
1643                    "Bad request format (no CreateRequestBody)")
1644        # The tcl parser needs to read a file so put the content into that file
1645        descr=req.get('experimentdescription', None)
1646        if descr:
1647            file_content=descr.get('ns2description', None)
1648            if file_content:
1649                try:
1650                    f = open(tclfile, 'w')
1651                    f.write(file_content)
1652                    f.close()
1653                except IOError:
1654                    raise service_error(service_error.internal,
1655                            "Cannot write temp experiment description")
1656            else:
1657                raise service_error(service_error.req, 
1658                        "Only ns2descriptions supported")
1659        else:
1660            raise service_error(service_error.req, "No experiment description")
1661
1662        if req.has_key('experimentID') and \
1663                req['experimentID'].has_key('localname'):
1664            eid = req['experimentID']['localname']
1665            self.state_lock.acquire()
1666            while (self.state.has_key(eid)):
1667                eid += random.choice(string.ascii_letters)
1668            # To avoid another thread picking this localname
1669            self.state[eid] = "placeholder"
1670            self.state_lock.release()
1671        else:
1672            eid = self.exp_stem
1673            for i in range(0,5):
1674                eid += random.choice(string.ascii_letters)
1675            self.state_lock.acquire()
1676            while (self.state.has_key(eid)):
1677                eid = self.exp_stem
1678                for i in range(0,5):
1679                    eid += random.choice(string.ascii_letters)
1680            # To avoid another thread picking this localname
1681            self.state[eid] = "placeholder"
1682            self.state_lock.release()
1683
1684        try: 
1685            # This catches exceptions to clear the placeholder if necessary
1686            try:
1687                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1688            except ValueError:
1689                raise service_error(service_error.server_config, 
1690                        "Bad key type (%s)" % self.ssh_type)
1691
1692            user = req.get('user', None)
1693            if user == None:
1694                raise service_error(service_error.req, "No user")
1695
1696            master = req.get('master', None)
1697            if not master:
1698                raise service_error(service_error.req,
1699                        "No master testbed label")
1700            export_project = req.get('exportProject', None)
1701            if not export_project:
1702                raise service_error(service_error.req, "No export project")
1703           
1704            if self.splitter_url:
1705                self.log.debug("Calling remote splitter at %s" % \
1706                        self.splitter_url)
1707                split_data = self.remote_splitter(self.splitter_url,
1708                        file_content, master)
1709            else:
1710                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1711                    str(self.muxmax), '-m', master]
1712
1713                if self.fedkit:
1714                    tclcmd.append('-k')
1715
1716                if self.gatewaykit:
1717                    tclcmd.append('-K')
1718
1719                tclcmd.extend([pid, gid, eid, tclfile])
1720
1721                self.log.debug("running local splitter %s", " ".join(tclcmd))
1722                tclparser = Popen(tclcmd, stdout=PIPE)
1723                split_data = tclparser.stdout
1724
1725            allocated = { }         # Testbeds we can access
1726            started = { }           # Testbeds where a sub-experiment started
1727                                # successfully
1728
1729            # Objects to parse the splitter output (defined above)
1730            parse_current_testbed = self.current_testbed(eid, tmpdir,
1731                    self.fedkit, self.gatewaykit)
1732            parse_allbeds = self.allbeds(self.get_access)
1733            parse_gateways = self.gateways(eid, master, tmpdir,
1734                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1735                    self.fedkit)
1736            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1737                        "^#\s+End\s+Vtopo")
1738            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1739                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1740            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1741                    "^#\s+End\s+tarfiles")
1742            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1743                    "^#\s+End\s+rpms")
1744
1745            # Working on the split data
1746            for line in split_data:
1747                line = line.rstrip()
1748                if parse_current_testbed(line, master, allocated, tbparams):
1749                    continue
1750                elif parse_allbeds(line, user, tbparams, master, export_project,
1751                        access_user):
1752                    continue
1753                elif parse_gateways(line, allocated, tbparams):
1754                    continue
1755                elif parse_vtopo(line):
1756                    continue
1757                elif parse_hostnames(line):
1758                    continue
1759                elif parse_tarfiles(line):
1760                    continue
1761                elif parse_rpms(line):
1762                    continue
1763                else:
1764                    raise service_error(service_error.internal, 
1765                            "Bad tcl parse? %s" % line)
1766            # Virtual topology and visualization
1767            vtopo = self.gentopo(parse_vtopo.str)
1768            if not vtopo:
1769                raise service_error(service_error.internal, 
1770                        "Failed to generate virtual topology")
1771
1772            vis = self.genviz(vtopo)
1773            if not vis:
1774                raise service_error(service_error.internal, 
1775                        "Failed to generate visualization")
1776           
1777            # save federant information
1778            for k in allocated.keys():
1779                tbparams[k]['federant'] = {\
1780                        'name': [ { 'localname' : eid} ],\
1781                        'emulab': tbparams[k]['emulab'],\
1782                        'allocID' : tbparams[k]['allocID'],\
1783                        'master' : k == master,\
1784                    }
1785
1786
1787            # Copy tarfiles and rpms needed at remote sites into a staging area
1788            try:
1789                if self.fedkit:
1790                    for t in self.fedkit:
1791                        parse_tarfiles.list.append(t[1])
1792                if self.gatewaykit:
1793                    for t in self.gatewaykit:
1794                        parse_tarfiles.list.append(t[1])
1795                for t in parse_tarfiles.list:
1796                    if not os.path.exists("%s/tarfiles" % tmpdir):
1797                        os.mkdir("%s/tarfiles" % tmpdir)
1798                    self.copy_file(t, "%s/tarfiles/%s" % \
1799                            (tmpdir, os.path.basename(t)))
1800                for r in parse_rpms.list:
1801                    if not os.path.exists("%s/rpms" % tmpdir):
1802                        os.mkdir("%s/rpms" % tmpdir)
1803                    self.copy_file(r, "%s/rpms/%s" % \
1804                            (tmpdir, os.path.basename(r)))
1805                # A null experiment file in case we need to create a remote
1806                # experiment from scratch
1807                f = open("%s/null.tcl" % tmpdir, "w")
1808                print >>f, """
1809set ns [new Simulator]
1810source tb_compat.tcl
1811
1812set a [$ns node]
1813
1814$ns rtproto Session
1815$ns run
1816"""
1817                f.close()
1818
1819            except IOError, e:
1820                raise service_error(service_error.internal, 
1821                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1822
1823        except service_error, e:
1824            # If something goes wrong in the parse (usually an access error)
1825            # clear the placeholder state.  From here on out the code delays
1826            # exceptions.
1827            self.state_lock.acquire()
1828            del self.state[eid]
1829            self.state_lock.release()
1830            raise e
1831
1832        thread_pool = self.thread_pool(self.nthreads)
1833        threads = [ ]
1834
1835        for tb in [ k for k in allocated.keys() if k != master]:
1836            # Create and start a thread to start the segment, and save it to
1837            # get the return value later
1838            thread_pool.wait_for_slot()
1839            t  = self.pooled_thread(target=self.start_segment, 
1840                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1841                    pdata=thread_pool, trace_file=self.trace_file)
1842            threads.append(t)
1843            t.start()
1844
1845        # Wait until all finish
1846        thread_pool.wait_for_all_done()
1847
1848        # If none failed, start the master
1849        failed = [ t.getName() for t in threads if not t.rv ]
1850
1851        if len(failed) == 0:
1852            if not self.start_segment(master, eid, tbparams, tmpdir):
1853                failed.append(master)
1854
1855        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1856        # If one failed clean up, unless fail_soft is set
1857        if failed:
1858            if not fail_soft:
1859                thread_pool.clear()
1860                for tb in succeeded:
1861                    # Create and start a thread to stop the segment
1862                    thread_pool.wait_for_slot()
1863                    t  = self.pooled_thread(target=self.stop_segment, 
1864                            args=(tb, eid, tbparams), name=tb,
1865                            pdata=thread_pool, trace_file=self.trace_file)
1866                    t.start()
1867                # Wait until all finish
1868                thread_pool.wait_for_all_done()
1869
1870                # release the allocations
1871                for tb in tbparams.keys():
1872                    self.release_access(tb, tbparams[tb]['allocID'])
1873                # Remove the placeholder
1874                self.state_lock.acquire()
1875                del self.state[eid]
1876                self.state_lock.release()
1877
1878                raise service_error(service_error.federant,
1879                    "Swap in failed on %s" % ",".join(failed))
1880        else:
1881            self.log.info("[start_segment]: Experiment %s started" % eid)
1882
1883        # Generate an ID for the experiment (slice) and a certificate that the
1884        # allocator can use to prove they own it.  We'll ship it back through
1885        # the encrypted connection.
1886        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1887
1888        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1889
1890        # Walk up tmpdir, deleting as we go
1891        for path, dirs, files in os.walk(tmpdir, topdown=False):
1892            for f in files:
1893                os.remove(os.path.join(path, f))
1894            for d in dirs:
1895                os.rmdir(os.path.join(path, d))
1896        os.rmdir(tmpdir)
1897
1898        # The deepcopy prevents the allocation ID and other binaries from being
1899        # translated into other formats
1900        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1901                for tb in tbparams.keys() \
1902                    if tbparams[tb].has_key('federant') ],\
1903                    'vtopo': vtopo,\
1904                    'vis' : vis,
1905                    'experimentID' : [\
1906                            { 'fedid': copy.copy(expid) }, \
1907                            { 'localname': eid },\
1908                        ],\
1909                    'experimentAccess': { 'X509' : expcert },\
1910                }
1911        # remove the allocationID info from each federant
1912        for f in resp['federant']:
1913            if f.has_key('allocID'): del f['allocID']
1914
1915        # Insert the experiment into our state and update the disk copy
1916        self.state_lock.acquire()
1917        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1918                for tb in tbparams.keys() \
1919                    if tbparams[tb].has_key('federant') ],\
1920                    'vtopo': vtopo,\
1921                    'vis' : vis,
1922                    'owner': fid,
1923                    'experimentID' : [\
1924                            { 'fedid': expid }, { 'localname': eid },\
1925                        ],\
1926                }
1927        self.state[eid] = self.state[expid]
1928        if self.state_filename: self.write_state()
1929        self.state_lock.release()
1930
1931        self.auth.set_attribute(fid, expid)
1932        self.auth.set_attribute(expid, expid)
1933
1934        if not failed:
1935            return resp
1936        else:
1937            raise service_error(service_error.partial, \
1938                    "Partial swap in on %s" % ",".join(succeeded))
1939
1940    def check_experiment_access(self, fid, key):
1941        """
1942        Confirm that the fid has access to the experiment.  Though a request
1943        may be made in terms of a local name, the access attribute is always
1944        the experiment's fedid.
1945        """
1946        if not isinstance(key, fedid):
1947            self.state_lock.acquire()
1948            if self.state.has_key(key):
1949                if isinstance(self.state[key], dict):
1950                    try:
1951                        kl = [ f['fedid'] for f in \
1952                                self.state[key]['experimentID']\
1953                                    if f.has_key('fedid') ]
1954                    except KeyError:
1955                        self.state_lock.release()
1956                        raise service_error(service_error.internal, 
1957                                "No fedid for experiment %s when checking " +\
1958                                        "access(!?)" % key)
1959                    if len(kl) == 1:
1960                        key = kl[0]
1961                    else:
1962                        self.state_lock.release()
1963                        raise service_error(service_error.internal, 
1964                                "multiple fedids for experiment %s when " +\
1965                                        "checking access(!?)" % key)
1966                elif isinstance(self.state[key], str):
1967                    self.state_lock.release()
1968                    raise service_error(service_error.internal, 
1969                            ("experiment %s is placeholder.  " +\
1970                                    "Creation in progress or aborted oddly") \
1971                                    % key)
1972                else:
1973                    self.state_lock.release()
1974                    raise service_error(service_error.internal, 
1975                            "Unexpected state for %s" % key)
1976
1977            else:
1978                self.state_lock.release()
1979                raise service_error(service_error.access, "Access Denied")
1980            self.state_lock.release()
1981
1982        if self.auth.check_attribute(fid, key):
1983            return True
1984        else:
1985            raise service_error(service_error.access, "Access Denied")
1986
1987
1988
1989    def get_vtopo(self, req, fid):
1990        """
1991        Return the stored virtual topology for this experiment
1992        """
1993        rv = None
1994
1995        req = req.get('VtopoRequestBody', None)
1996        if not req:
1997            raise service_error(service_error.req,
1998                    "Bad request format (no VtopoRequestBody)")
1999        exp = req.get('experiment', None)
2000        if exp:
2001            if exp.has_key('fedid'):
2002                key = exp['fedid']
2003                keytype = "fedid"
2004            elif exp.has_key('localname'):
2005                key = exp['localname']
2006                keytype = "localname"
2007            else:
2008                raise service_error(service_error.req, "Unknown lookup type")
2009        else:
2010            raise service_error(service_error.req, "No request?")
2011
2012        self.check_experiment_access(fid, key)
2013
2014        self.state_lock.acquire()
2015        if self.state.has_key(key):
2016            rv = { 'experiment' : {keytype: key },\
2017                    'vtopo': self.state[key]['vtopo'],\
2018                }
2019        self.state_lock.release()
2020
2021        if rv: return rv
2022        else: raise service_error(service_error.req, "No such experiment")
2023
2024    def get_vis(self, req, fid):
2025        """
2026        Return the stored visualization for this experiment
2027        """
2028        rv = None
2029
2030        req = req.get('VisRequestBody', None)
2031        if not req:
2032            raise service_error(service_error.req,
2033                    "Bad request format (no VisRequestBody)")
2034        exp = req.get('experiment', None)
2035        if exp:
2036            if exp.has_key('fedid'):
2037                key = exp['fedid']
2038                keytype = "fedid"
2039            elif exp.has_key('localname'):
2040                key = exp['localname']
2041                keytype = "localname"
2042            else:
2043                raise service_error(service_error.req, "Unknown lookup type")
2044        else:
2045            raise service_error(service_error.req, "No request?")
2046
2047        self.check_experiment_access(fid, key)
2048
2049        self.state_lock.acquire()
2050        if self.state.has_key(key):
2051            rv =  { 'experiment' : {keytype: key },\
2052                    'vis': self.state[key]['vis'],\
2053                    }
2054        self.state_lock.release()
2055
2056        if rv: return rv
2057        else: raise service_error(service_error.req, "No such experiment")
2058
2059    def get_info(self, req, fid):
2060        """
2061        Return all the stored info about this experiment
2062        """
2063        rv = None
2064
2065        req = req.get('InfoRequestBody', None)
2066        if not req:
2067            raise service_error(service_error.req,
2068                    "Bad request format (no VisRequestBody)")
2069        exp = req.get('experiment', None)
2070        if exp:
2071            if exp.has_key('fedid'):
2072                key = exp['fedid']
2073                keytype = "fedid"
2074            elif exp.has_key('localname'):
2075                key = exp['localname']
2076                keytype = "localname"
2077            else:
2078                raise service_error(service_error.req, "Unknown lookup type")
2079        else:
2080            raise service_error(service_error.req, "No request?")
2081
2082        self.check_experiment_access(fid, key)
2083
2084        # The state may be massaged by the service function that called
2085        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2086        # state.
2087        self.state_lock.acquire()
2088        if self.state.has_key(key):
2089            rv = copy.deepcopy(self.state[key])
2090        self.state_lock.release()
2091        # Remove the owner info
2092        del rv['owner']
2093        # remove the allocationID info from each federant
2094        for f in rv['federant']:
2095            if f.has_key('allocID'): del f['allocID']
2096
2097        if rv: return rv
2098        else: raise service_error(service_error.req, "No such experiment")
2099
2100
2101    def terminate_experiment(self, req, fid):
2102        """
2103        Swap this experiment out on the federants and delete the shared
2104        information
2105        """
2106        tbparams = { }
2107        req = req.get('TerminateRequestBody', None)
2108        if not req:
2109            raise service_error(service_error.req,
2110                    "Bad request format (no TerminateRequestBody)")
2111        exp = req.get('experiment', None)
2112        if exp:
2113            if exp.has_key('fedid'):
2114                key = exp['fedid']
2115                keytype = "fedid"
2116            elif exp.has_key('localname'):
2117                key = exp['localname']
2118                keytype = "localname"
2119            else:
2120                raise service_error(service_error.req, "Unknown lookup type")
2121        else:
2122            raise service_error(service_error.req, "No request?")
2123
2124        self.check_experiment_access(fid, key)
2125
2126        self.state_lock.acquire()
2127        fed_exp = self.state.get(key, None)
2128
2129        if fed_exp:
2130            # This branch of the conditional holds the lock to generate a
2131            # consistent temporary tbparams variable to deallocate experiments.
2132            # It releases the lock to do the deallocations and reacquires it to
2133            # remove the experiment state when the termination is complete.
2134            ids = []
2135            #  experimentID is a list of dicts that are self-describing
2136            #  identifiers.  This finds all the fedids and localnames - the
2137            #  keys of self.state - and puts them into ids.
2138            for id in fed_exp.get('experimentID', []):
2139                if id.has_key('fedid'): ids.append(id['fedid'])
2140                if id.has_key('localname'): ids.append(id['localname'])
2141
2142            # Construct enough of the tbparams to make the stop_segment calls
2143            # work
2144            for fed in fed_exp['federant']:
2145                try:
2146                    for e in fed['name']:
2147                        eid = e.get('localname', None)
2148                        if eid: break
2149                    else:
2150                        continue
2151
2152                    p = fed['emulab']['project']
2153
2154                    project = p['name']['localname']
2155                    tb = p['testbed']['localname']
2156                    user = p['user'][0]['userID']['localname']
2157
2158                    domain = fed['emulab']['domain']
2159                    host  = fed['emulab']['ops']
2160                    aid = fed['allocID']
2161                except KeyError, e:
2162                    continue
2163                tbparams[tb] = {\
2164                        'user': user,\
2165                        'domain': domain,\
2166                        'project': project,\
2167                        'host': host,\
2168                        'eid': eid,\
2169                        'aid': aid,\
2170                    }
2171            self.state_lock.release()
2172
2173            # Stop everyone.
2174            thread_pool = self.thread_pool(self.nthreads)
2175            for tb in tbparams.keys():
2176                # Create and start a thread to stop the segment
2177                thread_pool.wait_for_slot()
2178                t  = self.pooled_thread(target=self.stop_segment, 
2179                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2180                        pdata=thread_pool, trace_file=self.trace_file)
2181                t.start()
2182            # Wait for completions
2183            thread_pool.wait_for_all_done()
2184
2185            # release the allocations
2186            for tb in tbparams.keys():
2187                self.release_access(tb, tbparams[tb]['aid'])
2188
2189            # Remove the terminated experiment
2190            self.state_lock.acquire()
2191            for id in ids:
2192                if self.state.has_key(id): del self.state[id]
2193
2194            if self.state_filename: self.write_state()
2195            self.state_lock.release()
2196
2197            return { 'experiment': exp }
2198        else:
2199            # Don't forget to release the lock
2200            self.state_lock.release()
2201            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.