source: fedd/federation/experiment_control.py @ 8dfc06c

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 8dfc06c was 8dfc06c, checked in by Ted Faber <faber@…>, 15 years ago

Bug/typo in the control-only gateway code. That branch needs to be tested.

  • Property mode set to 100644
File size: 63.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self, nthreads):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53            self.nthreads = nthreads
54
55        def acquire(self):
56            """
57            Get the pool's lock.
58            """
59            self.changed.acquire()
60
61        def release(self):
62            """
63            Release the pool's lock.
64            """
65            self.changed.release()
66
67        def wait(self, timeout = None):
68            """
69            Wait for a pool thread to start or stop.
70            """
71            self.changed.wait(timeout)
72
73        def start(self):
74            """
75            Called by a pool thread to report starting.
76            """
77            self.changed.acquire()
78            self.started += 1
79            self.changed.notifyAll()
80            self.changed.release()
81
82        def terminate(self):
83            """
84            Called by a pool thread to report finishing.
85            """
86            self.changed.acquire()
87            self.terminated += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def clear(self):
92            """
93            Clear all pool data.
94            """
95            self.changed.acquire()
96            self.started = 0
97            self.terminated =0
98            self.changed.notifyAll()
99            self.changed.release()
100
101        def wait_for_slot(self):
102            """
103            Wait until we have a free slot to start another pooled thread
104            """
105            self.acquire()
106            while self.started - self.terminated >= self.nthreads:
107                self.wait()
108            self.release()
109
110        def wait_for_all_done(self):
111            """
112            Wait until all active threads finish (and at least one has started)
113            """
114            self.acquire()
115            while self.started == 0 or self.started > self.terminated:
116                self.wait()
117            self.release()
118
119    class pooled_thread(Thread):
120        """
121        One of a set of threads dedicated to a specific task.  Uses the
122        thread_pool class above for coordination.
123        """
124        def __init__(self, group=None, target=None, name=None, args=(), 
125                kwargs={}, pdata=None, trace_file=None):
126            Thread.__init__(self, group, target, name, args, kwargs)
127            self.rv = None          # Return value of the ops in this thread
128            self.exception = None   # Exception that terminated this thread
129            self.target=target      # Target function to run on start()
130            self.args = args        # Args to pass to target
131            self.kwargs = kwargs    # Additional kw args
132            self.pdata = pdata      # thread_pool for this class
133            # Logger for this thread
134            self.log = logging.getLogger("fedd.experiment_control")
135       
136        def run(self):
137            """
138            Emulate Thread.run, except add pool data manipulation and error
139            logging.
140            """
141            if self.pdata:
142                self.pdata.start()
143
144            if self.target:
145                try:
146                    self.rv = self.target(*self.args, **self.kwargs)
147                except service_error, s:
148                    self.exception = s
149                    self.log.error("Thread exception: %s %s" % \
150                            (s.code_string(), s.desc))
151                except:
152                    self.exception = sys.exc_info()[1]
153                    self.log.error(("Unexpected thread exception: %s" +\
154                            "Trace %s") % (self.exception,\
155                                traceback.format_exc()))
156            if self.pdata:
157                self.pdata.terminate()
158
159    call_RequestAccess = service_caller('RequestAccess')
160    call_ReleaseAccess = service_caller('ReleaseAccess')
161    call_Ns2Split = service_caller('Ns2Split')
162
163    def __init__(self, config=None, auth=None):
164        """
165        Intialize the various attributes, most from the config object
166        """
167        self.thread_with_rv = experiment_control_local.pooled_thread
168        self.thread_pool = experiment_control_local.thread_pool
169
170        self.cert_file = config.get("experiment_control", "cert_file")
171        if self.cert_file:
172            self.cert_pwd = config.get("experiment_control", "cert_pwd")
173        else:
174            self.cert_file = config.get("globals", "cert_file")
175            self.cert_pwd = config.get("globals", "cert_pwd")
176
177        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
178                or config.get("globals", "trusted_certs")
179
180        self.exp_stem = "fed-stem"
181        self.log = logging.getLogger("fedd.experiment_control")
182        set_log_level(config, "experiment_control", self.log)
183        self.muxmax = 2
184        self.nthreads = 2
185        self.randomize_experiments = False
186
187        self.scp_exec = "/usr/bin/scp"
188        self.splitter = None
189        self.ssh_exec="/usr/bin/ssh"
190        self.ssh_keygen = "/usr/bin/ssh-keygen"
191        self.ssh_identity_file = None
192
193
194        self.debug = config.getboolean("experiment_control", "create_debug")
195        self.state_filename = config.get("experiment_control", 
196                "experiment_state")
197        self.splitter_url = config.get("experiment_control", "splitter_uri")
198        self.fedkit = config.get("experiment_control", "fedkit")
199        accessdb_file = config.get("experiment_control", "accessdb")
200
201        self.ssh_pubkey_file = config.get("experiment_control", 
202                "ssh_pubkey_file")
203        self.ssh_privkey_file = config.get("experiment_control",
204                "ssh_privkey_file")
205        # NB for internal master/slave ops, not experiment setup
206        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
207        self.state = { }
208        self.state_lock = Lock()
209        self.tclsh = "/usr/local/bin/otclsh"
210        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
211                config.get("experiment_control", "tcl_splitter",
212                        "/usr/testbed/lib/ns2ir/parse.tcl")
213        mapdb_file = config.get("experiment_control", "mapdb")
214        self.trace_file = sys.stderr
215
216        self.def_expstart = \
217                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
218                "/tmp/federate";
219        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
220                "FEDDIR/hosts";
221        self.def_gwstart = \
222                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
223                "/tmp/bridge.log";
224        self.def_mgwstart = \
225                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
226                "/tmp/bridge.log";
227        self.def_gwimage = "FBSD61-TUNNEL2";
228        self.def_gwtype = "pc";
229        self.local_access = { }
230
231        if auth:
232            self.auth = auth
233        else:
234            self.log.error(\
235                    "[access]: No authorizer initialized, creating local one.")
236            auth = authorizer()
237
238
239        if self.ssh_pubkey_file:
240            try:
241                f = open(self.ssh_pubkey_file, 'r')
242                self.ssh_pubkey = f.read()
243                f.close()
244            except IOError:
245                raise service_error(service_error.internal,
246                        "Cannot read sshpubkey")
247        else:
248            raise service_error(service_error.internal, 
249                    "No SSH public key file?")
250
251        if not self.ssh_privkey_file:
252            raise service_error(service_error.internal, 
253                    "No SSH public key file?")
254
255
256        if mapdb_file:
257            self.read_mapdb(mapdb_file)
258        else:
259            self.log.warn("[experiment_control] No testbed map, using defaults")
260            self.tbmap = { 
261                    'deter':'https://users.isi.deterlab.net:23235',
262                    'emulab':'https://users.isi.deterlab.net:23236',
263                    'ucb':'https://users.isi.deterlab.net:23237',
264                    }
265
266        if accessdb_file:
267                self.read_accessdb(accessdb_file)
268        else:
269            raise service_error(service_error.internal,
270                    "No accessdb specified in config")
271
272        # Grab saved state.  OK to do this w/o locking because it's read only
273        # and only one thread should be in existence that can see self.state at
274        # this point.
275        if self.state_filename:
276            self.read_state()
277
278        # Dispatch tables
279        self.soap_services = {\
280                'Create': soap_handler('Create', self.create_experiment),
281                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
282                'Vis': soap_handler('Vis', self.get_vis),
283                'Info': soap_handler('Info', self.get_info),
284                'Terminate': soap_handler('Terminate', 
285                    self.terminate_experiment),
286        }
287
288        self.xmlrpc_services = {\
289                'Create': xmlrpc_handler('Create', self.create_experiment),
290                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
291                'Vis': xmlrpc_handler('Vis', self.get_vis),
292                'Info': xmlrpc_handler('Info', self.get_info),
293                'Terminate': xmlrpc_handler('Terminate',
294                    self.terminate_experiment),
295        }
296
297    def copy_file(self, src, dest, size=1024):
298        """
299        Exceedingly simple file copy.
300        """
301        s = open(src,'r')
302        d = open(dest, 'w')
303
304        buf = "x"
305        while buf != "":
306            buf = s.read(size)
307            d.write(buf)
308        s.close()
309        d.close()
310
311    # Call while holding self.state_lock
312    def write_state(self):
313        """
314        Write a new copy of experiment state after copying the existing state
315        to a backup.
316
317        State format is a simple pickling of the state dictionary.
318        """
319        if os.access(self.state_filename, os.W_OK):
320            self.copy_file(self.state_filename, \
321                    "%s.bak" % self.state_filename)
322        try:
323            f = open(self.state_filename, 'w')
324            pickle.dump(self.state, f)
325        except IOError, e:
326            self.log.error("Can't write file %s: %s" % \
327                    (self.state_filename, e))
328        except pickle.PicklingError, e:
329            self.log.error("Pickling problem: %s" % e)
330        except TypeError, e:
331            self.log.error("Pickling problem (TypeError): %s" % e)
332
333    # Call while holding self.state_lock
334    def read_state(self):
335        """
336        Read a new copy of experiment state.  Old state is overwritten.
337
338        State format is a simple pickling of the state dictionary.
339        """
340        try:
341            f = open(self.state_filename, "r")
342            self.state = pickle.load(f)
343            self.log.debug("[read_state]: Read state from %s" % \
344                    self.state_filename)
345        except IOError, e:
346            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
347                    % (self.state_filename, e))
348        except pickle.UnpicklingError, e:
349            self.log.warning(("[read_state]: No saved state: " + \
350                    "Unpickling failed: %s") % e)
351       
352        for k in self.state.keys():
353            try:
354                # This list should only have one element in it, but phrasing it
355                # as a for loop doesn't cost much, really.  We have to find the
356                # fedid elements anyway.
357                for eid in [ f['fedid'] \
358                        for f in self.state[k]['experimentID']\
359                            if f.has_key('fedid') ]:
360                    self.auth.set_attribute(self.state[k]['owner'], eid)
361            except KeyError, e:
362                self.log.warning("[read_state]: State ownership or identity " +\
363                        "misformatted in %s: %s" % (self.state_filename, e))
364
365
366    def read_accessdb(self, accessdb_file):
367        """
368        Read the mapping from fedids that can create experiments to their name
369        in the 3-level access namespace.  All will be asserted from this
370        testbed and can include the local username and porject that will be
371        asserted on their behalf by this fedd.  Each fedid is also added to the
372        authorization system with the "create" attribute.
373        """
374        self.accessdb = {}
375        # These are the regexps for parsing the db
376        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
377        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
378                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
379        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
380                "\s*->\s*(" + name_expr + ")\s*$")
381        lineno = 0
382
383        # Parse the mappings and store in self.authdb, a dict of
384        # fedid -> (proj, user)
385        try:
386            f = open(accessdb_file, "r")
387            for line in f:
388                lineno += 1
389                line = line.strip()
390                if len(line) == 0 or line.startswith('#'):
391                    continue
392                m = project_line.match(line)
393                if m:
394                    fid = fedid(hexstr=m.group(1))
395                    project, user = m.group(2,3)
396                    if not self.accessdb.has_key(fid):
397                        self.accessdb[fid] = []
398                    self.accessdb[fid].append((project, user))
399                    continue
400
401                m = user_line.match(line)
402                if m:
403                    fid = fedid(hexstr=m.group(1))
404                    project = None
405                    user = m.group(2)
406                    if not self.accessdb.has_key(fid):
407                        self.accessdb[fid] = []
408                    self.accessdb[fid].append((project, user))
409                    continue
410                self.log.warn("[experiment_control] Error parsing access " +\
411                        "db %s at line %d" %  (accessdb_file, lineno))
412        except IOError:
413            raise service_error(service_error.internal,
414                    "Error opening/reading %s as experiment " +\
415                            "control accessdb" %  accessdb_file)
416        f.close()
417
418        # Initialize the authorization attributes
419        for fid in self.accessdb.keys():
420            self.auth.set_attribute(fid, 'create')
421
422    def read_mapdb(self, file):
423        """
424        Read a simple colon separated list of mappings for the
425        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
426        """
427
428        self.tbmap = { }
429        lineno =0
430        try:
431            f = open(file, "r")
432            for line in f:
433                lineno += 1
434                line = line.strip()
435                if line.startswith('#') or len(line) == 0:
436                    continue
437                try:
438                    label, url = line.split(':', 1)
439                    self.tbmap[label] = url
440                except ValueError, e:
441                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
442                            "map db: %s %s" % (lineno, line, e))
443        except IOError, e:
444            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
445                    "open %s: %s" % (file, e))
446        f.close()
447
448    def scp_file(self, file, user, host, dest=""):
449        """
450        scp a file to the remote host.  If debug is set the action is only
451        logged.
452        """
453
454        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
455                '-o', 'StrictHostKeyChecking yes', '-i', 
456                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
457        rv = 0
458
459        try:
460            dnull = open("/dev/null", "w")
461        except IOError:
462            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
463            dnull = Null
464
465        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
466        if not self.debug:
467            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
468
469        return rv == 0
470
471    def ssh_cmd(self, user, host, cmd, wname=None):
472        """
473        Run a remote command on host as user.  If debug is set, the action is
474        only logged.
475        """
476        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
477                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
478                (self.ssh_exec, self.ssh_privkey_file, 
479                        user, host, cmd)
480
481        try:
482            dnull = open("/dev/null", "r")
483        except IOError:
484            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
485            dnull = Null
486
487        self.log.debug("[ssh_cmd]: %s" % sh_str)
488        if not self.debug:
489            if dnull:
490                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
491            else:
492                sub = Popen(sh_str, shell=True)
493            return sub.wait() == 0
494        else:
495            return True
496
497    def ship_configs(self, host, user, src_dir, dest_dir):
498        """
499        Copy federant-specific configuration files to the federant.
500        """
501        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
502            return False
503        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
504            return False
505
506        for f in os.listdir(src_dir):
507            if os.path.isdir(f):
508                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
509                        "%s/%s" % (dest_dir, f)):
510                    return False
511            else:
512                if not self.scp_file("%s/%s" % (src_dir, f), 
513                        user, host, dest_dir):
514                    return False
515        return True
516
517    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
518        """
519        Start a sub-experiment on a federant.
520
521        Get the current state, modify or create as appropriate, ship data and
522        configs and start the experiment.  There are small ordering differences
523        based on the initial state of the sub-experiment.
524        """
525        # ops node in the federant
526        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
527        user = tbparams[tb]['user']     # federant user
528        pid = tbparams[tb]['project']   # federant project
529        # XXX
530        base_confs = ( "hosts",)
531        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
532        # command to test experiment state
533        expinfo_exec = "/usr/testbed/bin/expinfo" 
534        # Configuration directories on the remote machine
535        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
536        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
537        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
538        # Regular expressions to parse the expinfo response
539        state_re = re.compile("State:\s+(\w+)")
540        no_exp_re = re.compile("^No\s+such\s+experiment")
541        state = None    # Experiment state parsed from expinfo
542        # The expinfo ssh command.  Note the identity restriction to use only
543        # the identity provided in the pubkey given.
544        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
545                'StrictHostKeyChecking yes', '-i', 
546                self.ssh_privkey_file, "%s@%s" % (user, host), 
547                expinfo_exec, pid, eid]
548
549        # Get status
550        self.log.debug("[start_segment]: %s"% " ".join(cmd))
551        dev_null = None
552        try:
553            dev_null = open("/dev/null", "a")
554        except IOError, e:
555            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
556
557        if self.debug:
558            state = 'swapped'
559            rv = 0
560        else:
561            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
562            for line in status.stdout:
563                m = state_re.match(line)
564                if m: state = m.group(1)
565                else:
566                    m = no_exp_re.match(line)
567                    if m: state = "none"
568            rv = status.wait()
569
570        # If the experiment is not present the subcommand returns a non-zero
571        # return value.  If we successfully parsed a "none" outcome, ignore the
572        # return code.
573        if rv != 0 and state != "none":
574            raise service_error(service_error.internal,
575                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
576
577        self.log.debug("[start_segment]: %s: %s" % (tb, state))
578        self.log.info("[start_segment]:transferring experiment to %s" % tb)
579
580        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
581            return False
582        # Clear the federation config dirs
583        if not self.ssh_cmd(user, host, 
584                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
585            return False
586        # Clear and create the tarfiles and rpm directories
587        for d in (tarfiles_dir, rpms_dir):
588            if not self.ssh_cmd(user, host, 
589                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
590                return False
591            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
592                    "create tarfiles"):
593                return False
594       
595        if state == 'active':
596            # Create the federation config dirs (do not move outside the
597            # conditional.  Happens later in new expriment creation)
598            if not self.ssh_cmd(user, host, 
599                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
600                return False
601            # Remote experiment is active.  Modify it.
602            for f in base_confs:
603                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
604                        "%s/%s" % (proj_dir, f)):
605                    return False
606            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
607                    proj_dir):
608                return False
609            if os.path.isdir("%s/tarfiles" % tmpdir):
610                if not self.ship_configs(host, user,
611                        "%s/tarfiles" % tmpdir, tarfiles_dir):
612                    return False
613            if os.path.isdir("%s/rpms" % tmpdir):
614                if not self.ship_configs(host, user,
615                        "%s/rpms" % tmpdir, tarfiles_dir):
616                    return False
617            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
618            if not self.ssh_cmd(user, host,
619                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
620                            (pid, eid, tclfile), "modexp"):
621                return False
622            return True
623        elif state == "swapped":
624            # Create the federation config dirs (do not move outside the
625            # conditional.  Happens later in new expriment creation)
626            if not self.ssh_cmd(user, host, 
627                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
628                return False
629            # Remote experiment swapped out.  Modify it and swap it in.
630            for f in base_confs:
631                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
632                        "%s/%s" % (proj_dir, f)):
633                    return False
634            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
635                    proj_dir):
636                return False
637            if os.path.isdir("%s/tarfiles" % tmpdir):
638                if not self.ship_configs(host, user,
639                        "%s/tarfiles" % tmpdir, tarfiles_dir):
640                    return False
641            if os.path.isdir("%s/rpms" % tmpdir):
642                if not self.ship_configs(host, user,
643                        "%s/rpms" % tmpdir, tarfiles_dir):
644                    return False
645            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
646            if not self.ssh_cmd(user, host,
647                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
648                    "modexp"):
649                return False
650            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
651            if not self.ssh_cmd(user, host,
652                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
653                    "swapexp"):
654                return False
655            return True
656        elif state == "none":
657            # No remote experiment.  Create one.  We do this in 2 steps so we
658            # can put the configuration files and scripts into the new
659            # experiment directories.
660
661            # Tarfiles must be present for creation to work
662            if os.path.isdir("%s/tarfiles" % tmpdir):
663                if not self.ship_configs(host, user,
664                        "%s/tarfiles" % tmpdir, tarfiles_dir):
665                    return False
666            if os.path.isdir("%s/rpms" % tmpdir):
667                if not self.ship_configs(host, user,
668                        "%s/rpms" % tmpdir, tarfiles_dir):
669                    return False
670            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
671            if not self.ssh_cmd(user, host,
672                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
673                            (pid, eid, tclfile), "startexp"):
674                return False
675            # Create the federation config dirs (do not move outside the
676            # conditional.)
677            if not self.ssh_cmd(user, host, 
678                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
679                return False
680            # After startexp the per-experiment directories exist
681            for f in base_confs:
682                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
683                        "%s/%s" % (proj_dir, f)):
684                    return False
685            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
686                    proj_dir):
687                return False
688            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
689            if not self.ssh_cmd(user, host,
690                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
691                    "swapexp"):
692                return False
693            return True
694        else:
695            self.log.debug("[start_segment]:unknown state %s" % state)
696            return False
697
698    def stop_segment(self, tb, eid, tbparams):
699        """
700        Stop a sub experiment by calling swapexp on the federant
701        """
702        user = tbparams[tb]['user']
703        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
704        pid = tbparams[tb]['project']
705
706        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
707        return self.ssh_cmd(user, host,
708                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
709
710       
711    def generate_ssh_keys(self, dest, type="rsa" ):
712        """
713        Generate a set of keys for the gateways to use to talk.
714
715        Keys are of type type and are stored in the required dest file.
716        """
717        valid_types = ("rsa", "dsa")
718        t = type.lower();
719        if t not in valid_types: raise ValueError
720        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
721
722        try:
723            trace = open("/dev/null", "w")
724        except IOError:
725            raise service_error(service_error.internal,
726                    "Cannot open /dev/null??");
727
728        # May raise CalledProcessError
729        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
730        rv = call(cmd, stdout=trace, stderr=trace)
731        if rv != 0:
732            raise service_error(service_error.internal, 
733                    "Cannot generate nonce ssh keys.  %s return code %d" \
734                            % (self.ssh_keygen, rv))
735
736    def gentopo(self, str):
737        """
738        Generate the topology dtat structure from the splitter's XML
739        representation of it.
740
741        The topology XML looks like:
742            <experiment>
743                <nodes>
744                    <node><vname></vname><ips>ip1:ip2</ips></node>
745                </nodes>
746                <lans>
747                    <lan>
748                        <vname></vname><vnode></vnode><ip></ip>
749                        <bandwidth></bandwidth><member>node:port</member>
750                    </lan>
751                </lans>
752        """
753        class topo_parse:
754            """
755            Parse the topology XML and create the dats structure.
756            """
757            def __init__(self):
758                # Typing of the subelements for data conversion
759                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
760                self.int_subelements = ( 'bandwidth',)
761                self.float_subelements = ( 'delay',)
762                # The final data structure
763                self.nodes = [ ]
764                self.lans =  [ ]
765                self.topo = { \
766                        'node': self.nodes,\
767                        'lan' : self.lans,\
768                    }
769                self.element = { }  # Current element being created
770                self.chars = ""     # Last text seen
771
772            def end_element(self, name):
773                # After each sub element the contents is added to the current
774                # element or to the appropriate list.
775                if name == 'node':
776                    self.nodes.append(self.element)
777                    self.element = { }
778                elif name == 'lan':
779                    self.lans.append(self.element)
780                    self.element = { }
781                elif name in self.str_subelements:
782                    self.element[name] = self.chars
783                    self.chars = ""
784                elif name in self.int_subelements:
785                    self.element[name] = int(self.chars)
786                    self.chars = ""
787                elif name in self.float_subelements:
788                    self.element[name] = float(self.chars)
789                    self.chars = ""
790
791            def found_chars(self, data):
792                self.chars += data.rstrip()
793
794
795        tp = topo_parse();
796        parser = xml.parsers.expat.ParserCreate()
797        parser.EndElementHandler = tp.end_element
798        parser.CharacterDataHandler = tp.found_chars
799
800        parser.Parse(str)
801
802        return tp.topo
803       
804
805    def genviz(self, topo):
806        """
807        Generate the visualization the virtual topology
808        """
809
810        neato = "/usr/local/bin/neato"
811        # These are used to parse neato output and to create the visualization
812        # file.
813        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
814        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
815                "%s</type></node>"
816
817        try:
818            # Node names
819            nodes = [ n['vname'] for n in topo['node'] ]
820            topo_lans = topo['lan']
821        except KeyError:
822            raise service_error(service_error.internal, "Bad topology")
823
824        lans = { }
825        links = { }
826
827        # Walk through the virtual topology, organizing the connections into
828        # 2-node connections (links) and more-than-2-node connections (lans).
829        # When a lan is created, it's added to the list of nodes (there's a
830        # node in the visualization for the lan).
831        for l in topo_lans:
832            if links.has_key(l['vname']):
833                if len(links[l['vname']]) < 2:
834                    links[l['vname']].append(l['vnode'])
835                else:
836                    nodes.append(l['vname'])
837                    lans[l['vname']] = links[l['vname']]
838                    del links[l['vname']]
839                    lans[l['vname']].append(l['vnode'])
840            elif lans.has_key(l['vname']):
841                lans[l['vname']].append(l['vnode'])
842            else:
843                links[l['vname']] = [ l['vnode'] ]
844
845
846        # Open up a temporary file for dot to turn into a visualization
847        try:
848            df, dotname = tempfile.mkstemp()
849            dotfile = os.fdopen(df, 'w')
850        except IOError:
851            raise service_error(service_error.internal,
852                    "Failed to open file in genviz")
853
854        # Generate a dot/neato input file from the links, nodes and lans
855        try:
856            print >>dotfile, "graph G {"
857            for n in nodes:
858                print >>dotfile, '\t"%s"' % n
859            for l in links.keys():
860                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
861            for l in lans.keys():
862                for n in lans[l]:
863                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
864            print >>dotfile, "}"
865            dotfile.close()
866        except TypeError:
867            raise service_error(service_error.internal,
868                    "Single endpoint link in vtopo")
869        except IOError:
870            raise service_error(service_error.internal, "Cannot write dot file")
871
872        # Use dot to create a visualization
873        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
874                '-Gpack=true', dotname], stdout=PIPE)
875
876        # Translate dot to vis format
877        vis_nodes = [ ]
878        vis = { 'node': vis_nodes }
879        for line in dot.stdout:
880            m = vis_re.match(line)
881            if m:
882                vn = m.group(1)
883                vis_node = {'name': vn, \
884                        'x': float(m.group(2)),\
885                        'y' : float(m.group(3)),\
886                    }
887                if vn in links.keys() or vn in lans.keys():
888                    vis_node['type'] = 'lan'
889                else:
890                    vis_node['type'] = 'node'
891                vis_nodes.append(vis_node)
892        rv = dot.wait()
893
894        os.remove(dotname)
895        if rv == 0 : return vis
896        else: return None
897
898    def get_access(self, tb, nodes, user, tbparam, master, export_project,
899            access_user):
900        """
901        Get access to testbed through fedd and set the parameters for that tb
902        """
903
904        # XXX This is needless complexity.  It must vanish.
905        translate_attr = {
906            'slavenodestartcmd': 'expstart',
907            'slaveconnectorstartcmd': 'gwstart',
908            'masternodestartcmd': 'mexpstart',
909            'masterconnectorstartcmd': 'mgwstart',
910            'connectorimage': 'gwimage',
911            'connectortype': 'gwtype',
912            'tunnelcfg': 'tun',
913            'tunnelinterface': 'tunnelinterface',
914            'smbshare': 'smbshare',
915            'slaveconnectorcmd': 'gwcmd',
916            'slaveconnectorcmdparams': 'gwcmdparams',
917            'masterconnectorcmd': 'mgwcmd',
918            'masterconnectorcmdparams': 'mgwcmdparams',
919        }
920
921        uri = self.tbmap.get(tb, None)
922        if not uri:
923            raise service_error(serice_error.server_config, 
924                    "Unknown testbed: %s" % tb)
925
926        # currently this lumps all users into one service access group
927        service_keys = [ a for u in user \
928                for a in u.get('access', []) \
929                    if a.has_key('sshPubkey')]
930
931        if len(service_keys) == 0:
932            raise service_error(service_error.req, 
933                    "Must have at least one SSH pubkey for services")
934
935
936        for p, u in access_user:
937            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
938                    "to %s") %  ((p or "None"), u, uri))
939
940            if p:
941                # Request with user and project specified
942                req = {\
943                        'destinationTestbed' : { 'uri' : uri },
944                        'project': { 
945                            'name': {'localname': p},
946                            'user': [ {'userID': { 'localname': u } } ],
947                            },
948                        'user':  user,
949                        'allocID' : { 'localname': 'test' },
950                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
951                        'serviceAccess' : service_keys
952                    }
953            else:
954                # Request with only user specified
955                req = {\
956                        'destinationTestbed' : { 'uri' : uri },
957                        'user':  [ {'userID': { 'localname': u } } ],
958                        'allocID' : { 'localname': 'test' },
959                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
960                        'serviceAccess' : service_keys
961                    }
962
963            if tb == master:
964                # NB, the export_project parameter is a dict that includes
965                # the type
966                req['exportProject'] = export_project
967
968            # node resources if any
969            if nodes != None and len(nodes) > 0:
970                rnodes = [ ]
971                for n in nodes:
972                    rn = { }
973                    image, hw, count = n.split(":")
974                    if image: rn['image'] = [ image ]
975                    if hw: rn['hardware'] = [ hw ]
976                    if count and int(count) >0 : rn['count'] = int(count)
977                    rnodes.append(rn)
978                req['resources']= { }
979                req['resources']['node'] = rnodes
980
981            try:
982                if self.local_access.has_key(uri):
983                    # Local access call
984                    req = { 'RequestAccessRequestBody' : req }
985                    r = self.local_access[uri].RequestAccess(req, 
986                            fedid(file=self.cert_file))
987                    r = { 'RequestAccessResponseBody' : r }
988                else:
989                    r = self.call_RequestAccess(uri, req, 
990                            self.cert_file, self.cert_pwd, self.trusted_certs)
991            except service_error, e:
992                if e.code == service_error.access:
993                    self.log.debug("[get_access] Access denied")
994                    r = None
995                    continue
996                else:
997                    raise e
998
999            if r.has_key('RequestAccessResponseBody'):
1000                # Through to here we have a valid response, not a fault.
1001                # Access denied is a fault, so something better or worse than
1002                # access denied has happened.
1003                r = r['RequestAccessResponseBody']
1004                self.log.debug("[get_access] Access granted")
1005                break
1006            else:
1007                raise service_error(service_error.protocol,
1008                        "Bad proxy response")
1009       
1010        if not r:
1011            raise service_error(service_error.access, 
1012                    "Access denied by %s (%s)" % (tb, uri))
1013
1014        e = r['emulab']
1015        p = e['project']
1016        tbparam[tb] = { 
1017                "boss": e['boss'],
1018                "host": e['ops'],
1019                "domain": e['domain'],
1020                "fs": e['fileServer'],
1021                "eventserver": e['eventServer'],
1022                "project": unpack_id(p['name']),
1023                "emulab" : e,
1024                "allocID" : r['allocID'],
1025                }
1026        # Make the testbed name be the label the user applied
1027        p['testbed'] = {'localname': tb }
1028
1029        for u in p['user']:
1030            role = u.get('role', None)
1031            if role == 'experimentCreation':
1032                tbparam[tb]['user'] = unpack_id(u['userID'])
1033                break
1034        else:
1035            raise service_error(service_error.internal, 
1036                    "No createExperimentUser from %s" %tb)
1037
1038        for a in e['fedAttr']:
1039            if a['attribute']:
1040                key = translate_attr.get(a['attribute'].lower(), None)
1041                if key:
1042                    tbparam[tb][key]= a['value']
1043       
1044    def release_access(self, tb, aid):
1045        """
1046        Release access to testbed through fedd
1047        """
1048
1049        uri = self.tbmap.get(tb, None)
1050        if not uri:
1051            raise service_error(serice_error.server_config, 
1052                    "Unknown testbed: %s" % tb)
1053
1054        if self.local_access.has_key(uri):
1055            resp = self.local_access[uri].ReleaseAccess(\
1056                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1057                    fedid(file=self.cert_file))
1058            resp = { 'ReleaseAccessResponseBody': resp } 
1059        else:
1060            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1061                    self.cert_file, self.cert_pwd, self.trusted_certs)
1062
1063        # better error coding
1064
1065    def remote_splitter(self, uri, desc, master):
1066
1067        req = {
1068                'description' : { 'ns2description': desc },
1069                'master': master,
1070                'include_fedkit': bool(self.fedkit)
1071            }
1072
1073        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1074                self.trusted_certs)
1075
1076        if r.has_key('Ns2SplitResponseBody'):
1077            r = r['Ns2SplitResponseBody']
1078            if r.has_key('output'):
1079                return r['output'].splitlines()
1080            else:
1081                raise service_error(service_error.protocol, 
1082                        "Bad splitter response (no output)")
1083        else:
1084            raise service_error(service_error.protocol, "Bad splitter response")
1085       
1086    class current_testbed:
1087        """
1088        Object for collecting the current testbed description.  The testbed
1089        description is saved to a file with the local testbed variables
1090        subsittuted line by line.
1091        """
1092        def __init__(self, eid, tmpdir, fedkit):
1093            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1094            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1095            self.current_testbed = None
1096            self.testbed_file = None
1097
1098            self.def_expstart = \
1099                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1100            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1101            self.def_gwstart = \
1102                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1103            self.def_mgwstart = \
1104                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1105            self.def_gwimage = "FBSD61-TUNNEL2";
1106            self.def_gwtype = "pc";
1107            self.def_mgwcmd = '# '
1108            self.def_mgwcmdparams = ''
1109            self.def_gwcmd = '# '
1110            self.def_gwcmdparams = ''
1111
1112            self.eid = eid
1113            self.tmpdir = tmpdir
1114            self.fedkit = fedkit
1115
1116        def __call__(self, line, master, allocated, tbparams):
1117            # Capture testbed topology descriptions
1118            if self.current_testbed == None:
1119                m = self.begin_testbed.match(line)
1120                if m != None:
1121                    self.current_testbed = m.group(1)
1122                    if self.current_testbed == None:
1123                        raise service_error(service_error.req,
1124                                "Bad request format (unnamed testbed)")
1125                    allocated[self.current_testbed] = \
1126                            allocated.get(self.current_testbed,0) + 1
1127                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1128                    if not os.path.exists(tb_dir):
1129                        try:
1130                            os.mkdir(tb_dir)
1131                        except IOError:
1132                            raise service_error(service_error.internal,
1133                                    "Cannot create %s" % tb_dir)
1134                    try:
1135                        self.testbed_file = open("%s/%s.%s.tcl" %
1136                                (tb_dir, self.eid, self.current_testbed), 'w')
1137                    except IOError:
1138                        self.testbed_file = None
1139                    return True
1140                else: return False
1141            else:
1142                m = self.end_testbed.match(line)
1143                if m != None:
1144                    if m.group(1) != self.current_testbed:
1145                        raise service_error(service_error.internal, 
1146                                "Mismatched testbed markers!?")
1147                    if self.testbed_file != None: 
1148                        self.testbed_file.close()
1149                        self.testbed_file = None
1150                    self.current_testbed = None
1151                elif self.testbed_file:
1152                    # Substitute variables and put the line into the local
1153                    # testbed file.
1154                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1155                            self.def_gwtype)
1156                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1157                            self.def_gwimage)
1158                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1159                            self.def_mgwstart)
1160                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1161                            self.def_mexpstart)
1162                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1163                            self.def_gwstart)
1164                    expstart = tbparams[self.current_testbed].get('expstart', 
1165                            self.def_expstart)
1166                    project = tbparams[self.current_testbed].get('project')
1167                    gwcmd = tbparams[self.current_testbed].get('gwcmd', 
1168                            self.def_gwcmd)
1169                    gwcmdparams = tbparams[self.current_testbed].get(\
1170                            'gwcmdparams', self.def_gwcmdparams)
1171                    mgwcmd = tbparams[self.current_testbed].get('mgwcmd', 
1172                            self.def_gwcmd)
1173                    mgwcmdparams = tbparams[self.current_testbed].get(\
1174                            'mgwcmdparams', self.def_gwcmdparams)
1175                    line = re.sub("GWTYPE", gwtype, line)
1176                    line = re.sub("GWIMAGE", gwimage, line)
1177                    if self.current_testbed == master:
1178                        line = re.sub("GWSTART", mgwstart, line)
1179                        line = re.sub("EXPSTART", mexpstart, line)
1180                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1181                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1182                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1183                    else:
1184                        line = re.sub("GWSTART", gwstart, line)
1185                        line = re.sub("EXPSTART", expstart, line)
1186                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1187                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1188                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1189                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1190                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1191                    line = re.sub("EID", self.eid, line)
1192                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1193                            (project, self.eid), line)
1194                    if self.fedkit:
1195                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1196                                line)
1197                    print >>self.testbed_file, line
1198                return True
1199
1200    class allbeds:
1201        """
1202        Process the Allbeds section.  Get access to each federant and save the
1203        parameters in tbparams
1204        """
1205        def __init__(self, get_access):
1206            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1207            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1208            self.in_allbeds = False
1209            self.get_access = get_access
1210
1211        def __call__(self, line, user, tbparams, master, export_project,
1212                access_user):
1213            # Testbed access parameters
1214            if not self.in_allbeds:
1215                if self.begin_allbeds.match(line):
1216                    self.in_allbeds = True
1217                    return True
1218                else:
1219                    return False
1220            else:
1221                if self.end_allbeds.match(line):
1222                    self.in_allbeds = False
1223                else:
1224                    nodes = line.split('|')
1225                    tb = nodes.pop(0)
1226                    self.get_access(tb, nodes, user, tbparams, master,
1227                            export_project, access_user)
1228                return True
1229
1230    class gateways:
1231        def __init__(self, eid, master, tmpdir, gw_pubkey,
1232                gw_secretkey, copy_file, fedkit):
1233            self.begin_gateways = \
1234                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1235            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1236            self.current_gateways = None
1237            self.control_gateway = None
1238            self.active_end = { }
1239
1240            self.eid = eid
1241            self.master = master
1242            self.tmpdir = tmpdir
1243            self.gw_pubkey_base = gw_pubkey
1244            self.gw_secretkey_base = gw_secretkey
1245
1246            self.copy_file = copy_file
1247            self.fedkit = fedkit
1248
1249
1250        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1251                active_end, tbparams, dtb, myname, desthost, type):
1252            """
1253            Produce a gateway configuration file from a gateways line.
1254            """
1255
1256            sproject = tbparams[gw].get('project', 'project')
1257            dproject = tbparams[dtb].get('project', 'project')
1258            sdomain = ".%s.%s%s" % (eid, sproject,
1259                    tbparams[gw].get('domain', ".example.com"))
1260            ddomain = ".%s.%s%s" % (eid, dproject,
1261                    tbparams[dtb].get('domain', ".example.com"))
1262            boss = tbparams[master].get('boss', "boss")
1263            fs = tbparams[master].get('fs', "fs")
1264            event_server = "%s%s" % \
1265                    (tbparams[gw].get('eventserver', "event_server"),
1266                            tbparams[gw].get('domain', "example.com"))
1267            remote_event_server = "%s%s" % \
1268                    (tbparams[dtb].get('eventserver', "event_server"),
1269                            tbparams[dtb].get('domain', "example.com"))
1270            seer_control = "%s%s" % \
1271                    (tbparams[gw].get('control', "control"), sdomain)
1272            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1273
1274            if self.fedkit:
1275                remote_script_dir = "/usr/local/federation/bin"
1276                local_script_dir = "/usr/local/federation/bin"
1277            else:
1278                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1279                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1280
1281            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1282            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1283            tunnel_cfg = tbparams[gw].get("tun", "false")
1284
1285            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1286            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1287
1288            # translate to lower case so the `hostname` hack for specifying
1289            # configuration files works.
1290            conf_file = conf_file.lower();
1291            remote_conf_file = remote_conf_file.lower();
1292
1293            if dtb == master:
1294                active = "false"
1295            elif gw == master:
1296                active = "true"
1297            elif active_end.has_key('%s-%s' % (dtb, gw)):
1298                active = "false"
1299            else:
1300                active_end['%s-%s' % (gw, dtb)] = 1
1301                active = "true"
1302
1303            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1304            print >>gwconfig, "Active: %s" % active
1305            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1306            if tunnel_iface:
1307                print >>gwconfig, "Interface: %s" % tunnel_iface
1308            print >>gwconfig, "BossName: %s" % boss
1309            print >>gwconfig, "FsName: %s" % fs
1310            print >>gwconfig, "EventServerName: %s" % event_server
1311            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1312            print >>gwconfig, "SeerControl: %s" % seer_control
1313            print >>gwconfig, "Type: %s" % type
1314            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1315            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1316                    local_script_dir
1317            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1318            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1319            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1320                    (remote_conf_dir, remote_conf_file)
1321            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1322            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1323            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1324            gwconfig.close()
1325
1326            return active == "true"
1327
1328        def __call__(self, line, allocated, tbparams):
1329            # Process gateways
1330            if not self.current_gateways:
1331                m = self.begin_gateways.match(line)
1332                if m:
1333                    self.current_gateways = m.group(1)
1334                    if allocated.has_key(self.current_gateways):
1335                        # This test should always succeed
1336                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1337                        if not os.path.exists(tb_dir):
1338                            try:
1339                                os.mkdir(tb_dir)
1340                            except IOError:
1341                                raise service_error(service_error.internal,
1342                                        "Cannot create %s" % tb_dir)
1343                    else:
1344                        # XXX
1345                        self.log.error("[gateways]: Ignoring gateways for " + \
1346                                "unknown testbed %s" % self.current_gateways)
1347                        self.current_gateways = None
1348                    return True
1349                else:
1350                    return False
1351            else:
1352                m = self.end_gateways.match(line)
1353                if m :
1354                    if m.group(1) != self.current_gateways:
1355                        raise service_error(service_error.internal,
1356                                "Mismatched gateway markers!?")
1357                    if self.control_gateway:
1358                        try:
1359                            cc = open("%s/%s/client.conf" %
1360                                    (self.tmpdir, self.current_gateways), 'w')
1361                            print >>cc, "ControlGateway: %s" % \
1362                                    self.control_gateway
1363                            if tbparams[self.master].has_key('smbshare'):
1364                                print >>cc, "SMBSHare: %s" % \
1365                                        tbparams[self.master]['smbshare']
1366                            print >>cc, "ProjectUser: %s" % \
1367                                    tbparams[self.master]['user']
1368                            print >>cc, "ProjectName: %s" % \
1369                                    tbparams[self.master]['project']
1370                            cc.close()
1371                        except IOError:
1372                            raise service_error(service_error.internal,
1373                                    "Error creating client config")
1374                        try:
1375                            cc = open("%s/%s/seer.conf" %
1376                                    (self.tmpdir, self.current_gateways),
1377                                    'w')
1378                            if self.current_gateways != self.master:
1379                                print >>cc, "ControlNode: %s" % \
1380                                        self.control_gateway
1381                            print >>cc, "ExperimentID: %s/%s" % \
1382                                    ( tbparams[self.master]['project'], \
1383                                    self.eid )
1384                            cc.close()
1385                        except IOError:
1386                            raise service_error(service_error.internal,
1387                                    "Error creating seer config")
1388                    else:
1389                        debug.error("[gateways]: No control gateway for %s" %\
1390                                    self.current_gateways)
1391                    self.current_gateways = None
1392                else:
1393                    dtb, myname, desthost, type = line.split(" ")
1394
1395                    if type == "control" or type == "both":
1396                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1397                                self.eid, 
1398                                tbparams[self.current_gateways]['project'],
1399                                tbparams[self.current_gateways]['domain'])
1400                    try:
1401                        active = self.gateway_conf_file(self.current_gateways,
1402                                self.master, self.eid, self.gw_pubkey_base,
1403                                self.gw_secretkey_base,
1404                                self.active_end, tbparams, dtb, myname,
1405                                desthost, type)
1406                    except IOError, e:
1407                        raise service_error(service_error.internal,
1408                                "Failed to write config file for %s" % \
1409                                        self.current_gateway)
1410           
1411                    gw_pubkey = "%s/keys/%s" % \
1412                            (self.tmpdir, self.gw_pubkey_base)
1413                    gw_secretkey = "%s/keys/%s" % \
1414                            (self.tmpdir, self.gw_secretkey_base)
1415
1416                    pkfile = "%s/%s/%s" % \
1417                            ( self.tmpdir, self.current_gateways, 
1418                                    self.gw_pubkey_base)
1419                    skfile = "%s/%s/%s" % \
1420                            ( self.tmpdir, self.current_gateways, 
1421                                    self.gw_secretkey_base)
1422
1423                    if not os.path.exists(pkfile):
1424                        try:
1425                            self.copy_file(gw_pubkey, pkfile)
1426                        except IOError:
1427                            service_error(service_error.internal,
1428                                    "Failed to copy pubkey file")
1429
1430                    if active and not os.path.exists(skfile):
1431                        try:
1432                            self.copy_file(gw_secretkey, skfile)
1433                        except IOError:
1434                            service_error(service_error.internal,
1435                                    "Failed to copy secretkey file")
1436                return True
1437
1438    class shunt_to_file:
1439        """
1440        Simple class to write data between two regexps to a file.
1441        """
1442        def __init__(self, begin, end, filename):
1443            """
1444            Begin shunting on a match of begin, stop on end, send data to
1445            filename.
1446            """
1447            self.begin = re.compile(begin)
1448            self.end = re.compile(end)
1449            self.in_shunt = False
1450            self.file = None
1451            self.filename = filename
1452
1453        def __call__(self, line):
1454            """
1455            Call this on each line in the input that may be shunted.
1456            """
1457            if not self.in_shunt:
1458                if self.begin.match(line):
1459                    self.in_shunt = True
1460                    try:
1461                        self.file = open(self.filename, "w")
1462                    except:
1463                        self.file = None
1464                        raise
1465                    return True
1466                else:
1467                    return False
1468            else:
1469                if self.end.match(line):
1470                    if self.file: 
1471                        self.file.close()
1472                        self.file = None
1473                    self.in_shunt = False
1474                else:
1475                    if self.file:
1476                        print >>self.file, line
1477                return True
1478
1479    class shunt_to_list:
1480        """
1481        Same interface as shunt_to_file.  Data collected in self.list, one list
1482        element per line.
1483        """
1484        def __init__(self, begin, end):
1485            self.begin = re.compile(begin)
1486            self.end = re.compile(end)
1487            self.in_shunt = False
1488            self.list = [ ]
1489       
1490        def __call__(self, line):
1491            if not self.in_shunt:
1492                if self.begin.match(line):
1493                    self.in_shunt = True
1494                    return True
1495                else:
1496                    return False
1497            else:
1498                if self.end.match(line):
1499                    self.in_shunt = False
1500                else:
1501                    self.list.append(line)
1502                return True
1503
1504    class shunt_to_string:
1505        """
1506        Same interface as shunt_to_file.  Data collected in self.str, all in
1507        one string.
1508        """
1509        def __init__(self, begin, end):
1510            self.begin = re.compile(begin)
1511            self.end = re.compile(end)
1512            self.in_shunt = False
1513            self.str = ""
1514       
1515        def __call__(self, line):
1516            if not self.in_shunt:
1517                if self.begin.match(line):
1518                    self.in_shunt = True
1519                    return True
1520                else:
1521                    return False
1522            else:
1523                if self.end.match(line):
1524                    self.in_shunt = False
1525                else:
1526                    self.str += line
1527                return True
1528
1529    def create_experiment(self, req, fid):
1530        """
1531        The external interface to experiment creation called from the
1532        dispatcher.
1533
1534        Creates a working directory, splits the incoming description using the
1535        splitter script and parses out the avrious subsections using the
1536        lcasses above.  Once each sub-experiment is created, use pooled threads
1537        to instantiate them and start it all up.
1538        """
1539
1540        if not self.auth.check_attribute(fid, 'create'):
1541            raise service_error(service_error.access, "Create access denied")
1542
1543        try:
1544            tmpdir = tempfile.mkdtemp(prefix="split-")
1545        except IOError:
1546            raise service_error(service_error.internal, "Cannot create tmp dir")
1547
1548        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1549        gw_secretkey_base = "fed.%s" % self.ssh_type
1550        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1551        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1552        tclfile = tmpdir + "/experiment.tcl"
1553        tbparams = { }
1554        try:
1555            access_user = self.accessdb[fid]
1556        except KeyError:
1557            raise service_error(service_error.internal,
1558                    "Access map and authorizer out of sync in " + \
1559                            "create_experiment for fedid %s"  % fid)
1560
1561        pid = "dummy"
1562        gid = "dummy"
1563        # XXX
1564        fail_soft = False
1565
1566        try:
1567            os.mkdir(tmpdir+"/keys")
1568        except OSError:
1569            raise service_error(service_error.internal,
1570                    "Can't make temporary dir")
1571
1572        req = req.get('CreateRequestBody', None)
1573        if not req:
1574            raise service_error(service_error.req,
1575                    "Bad request format (no CreateRequestBody)")
1576        # The tcl parser needs to read a file so put the content into that file
1577        descr=req.get('experimentdescription', None)
1578        if descr:
1579            file_content=descr.get('ns2description', None)
1580            if file_content:
1581                try:
1582                    f = open(tclfile, 'w')
1583                    f.write(file_content)
1584                    f.close()
1585                except IOError:
1586                    raise service_error(service_error.internal,
1587                            "Cannot write temp experiment description")
1588            else:
1589                raise service_error(service_error.req, 
1590                        "Only ns2descriptions supported")
1591        else:
1592            raise service_error(service_error.req, "No experiment description")
1593
1594        if req.has_key('experimentID') and \
1595                req['experimentID'].has_key('localname'):
1596            eid = req['experimentID']['localname']
1597            self.state_lock.acquire()
1598            while (self.state.has_key(eid)):
1599                eid += random.choice(string.ascii_letters)
1600            # To avoid another thread picking this localname
1601            self.state[eid] = "placeholder"
1602            self.state_lock.release()
1603        else:
1604            eid = self.exp_stem
1605            for i in range(0,5):
1606                eid += random.choice(string.ascii_letters)
1607            self.state_lock.acquire()
1608            while (self.state.has_key(eid)):
1609                eid = self.exp_stem
1610                for i in range(0,5):
1611                    eid += random.choice(string.ascii_letters)
1612            # To avoid another thread picking this localname
1613            self.state[eid] = "placeholder"
1614            self.state_lock.release()
1615
1616        try: 
1617            # This catches exceptions to clear the placeholder if necessary
1618            try:
1619                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1620            except ValueError:
1621                raise service_error(service_error.server_config, 
1622                        "Bad key type (%s)" % self.ssh_type)
1623
1624            user = req.get('user', None)
1625            if user == None:
1626                raise service_error(service_error.req, "No user")
1627
1628            master = req.get('master', None)
1629            if not master:
1630                raise service_error(service_error.req,
1631                        "No master testbed label")
1632            export_project = req.get('exportProject', None)
1633            if not export_project:
1634                raise service_error(service_error.req, "No export project")
1635           
1636            if self.splitter_url:
1637                self.log.debug("Calling remote splitter at %s" % \
1638                        self.splitter_url)
1639                split_data = self.remote_splitter(self.splitter_url,
1640                        file_content, master)
1641            else:
1642                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1643                    str(self.muxmax), '-m', master]
1644
1645                if self.fedkit:
1646                    tclcmd.append('-k')
1647
1648                tclcmd.extend([pid, gid, eid, tclfile])
1649
1650                self.log.debug("running local splitter %s", " ".join(tclcmd))
1651                tclparser = Popen(tclcmd, stdout=PIPE)
1652                split_data = tclparser.stdout
1653
1654            allocated = { }         # Testbeds we can access
1655            started = { }           # Testbeds where a sub-experiment started
1656                                # successfully
1657
1658            # Objects to parse the splitter output (defined above)
1659            parse_current_testbed = self.current_testbed(eid, tmpdir,
1660                    self.fedkit)
1661            parse_allbeds = self.allbeds(self.get_access)
1662            parse_gateways = self.gateways(eid, master, tmpdir,
1663                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1664                    self.fedkit)
1665            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1666                        "^#\s+End\s+Vtopo")
1667            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1668                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1669            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1670                    "^#\s+End\s+tarfiles")
1671            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1672                    "^#\s+End\s+rpms")
1673
1674            # Working on the split data
1675            for line in split_data:
1676                line = line.rstrip()
1677                if parse_current_testbed(line, master, allocated, tbparams):
1678                    continue
1679                elif parse_allbeds(line, user, tbparams, master, export_project,
1680                        access_user):
1681                    continue
1682                elif parse_gateways(line, allocated, tbparams):
1683                    continue
1684                elif parse_vtopo(line):
1685                    continue
1686                elif parse_hostnames(line):
1687                    continue
1688                elif parse_tarfiles(line):
1689                    continue
1690                elif parse_rpms(line):
1691                    continue
1692                else:
1693                    raise service_error(service_error.internal, 
1694                            "Bad tcl parse? %s" % line)
1695            # Virtual topology and visualization
1696            vtopo = self.gentopo(parse_vtopo.str)
1697            if not vtopo:
1698                raise service_error(service_error.internal, 
1699                        "Failed to generate virtual topology")
1700
1701            vis = self.genviz(vtopo)
1702            if not vis:
1703                raise service_error(service_error.internal, 
1704                        "Failed to generate visualization")
1705           
1706            # save federant information
1707            for k in allocated.keys():
1708                tbparams[k]['federant'] = {\
1709                        'name': [ { 'localname' : eid} ],\
1710                        'emulab': tbparams[k]['emulab'],\
1711                        'allocID' : tbparams[k]['allocID'],\
1712                        'master' : k == master,\
1713                    }
1714
1715
1716            # Copy tarfiles and rpms needed at remote sites into a staging area
1717            try:
1718                if self.fedkit:
1719                    parse_tarfiles.list.append(self.fedkit)
1720                for t in parse_tarfiles.list:
1721                    if not os.path.exists("%s/tarfiles" % tmpdir):
1722                        os.mkdir("%s/tarfiles" % tmpdir)
1723                    self.copy_file(t, "%s/tarfiles/%s" % \
1724                            (tmpdir, os.path.basename(t)))
1725                for r in parse_rpms.list:
1726                    if not os.path.exists("%s/rpms" % tmpdir):
1727                        os.mkdir("%s/rpms" % tmpdir)
1728                    self.copy_file(r, "%s/rpms/%s" % \
1729                            (tmpdir, os.path.basename(r)))
1730            except IOError, e:
1731                raise service_error(service_error.internal, 
1732                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1733
1734        except service_error, e:
1735            # If something goes wrong in the parse (usually an access error)
1736            # clear the placeholder state.  From here on out the code delays
1737            # exceptions.
1738            self.state_lock.acquire()
1739            del self.state[eid]
1740            self.state_lock.release()
1741            raise e
1742
1743        thread_pool = self.thread_pool(self.nthreads)
1744        threads = [ ]
1745
1746        for tb in [ k for k in allocated.keys() if k != master]:
1747            # Create and start a thread to start the segment, and save it to
1748            # get the return value later
1749            thread_pool.wait_for_slot()
1750            t  = self.pooled_thread(target=self.start_segment, 
1751                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1752                    pdata=thread_pool, trace_file=self.trace_file)
1753            threads.append(t)
1754            t.start()
1755
1756        # Wait until all finish
1757        thread_pool.wait_for_all_done()
1758
1759        # If none failed, start the master
1760        failed = [ t.getName() for t in threads if not t.rv ]
1761
1762        if len(failed) == 0:
1763            if not self.start_segment(master, eid, tbparams, tmpdir):
1764                failed.append(master)
1765
1766        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1767        # If one failed clean up, unless fail_soft is set
1768        if failed:
1769            if not fail_soft:
1770                thread_pool.clear()
1771                for tb in succeeded:
1772                    # Create and start a thread to stop the segment
1773                    thread_pool.wait_for_slot()
1774                    t  = self.pooled_thread(target=self.stop_segment, 
1775                            args=(tb, eid, tbparams), name=tb,
1776                            pdata=thread_pool, trace_file=self.trace_file)
1777                    t.start()
1778                # Wait until all finish
1779                thread_pool.wait_for_all_done()
1780
1781                # release the allocations
1782                for tb in tbparams.keys():
1783                    self.release_access(tb, tbparams[tb]['allocID'])
1784                # Remove the placeholder
1785                self.state_lock.acquire()
1786                del self.state[eid]
1787                self.state_lock.release()
1788
1789                raise service_error(service_error.federant,
1790                    "Swap in failed on %s" % ",".join(failed))
1791        else:
1792            self.log.info("[start_segment]: Experiment %s started" % eid)
1793
1794        # Generate an ID for the experiment (slice) and a certificate that the
1795        # allocator can use to prove they own it.  We'll ship it back through
1796        # the encrypted connection.
1797        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1798
1799        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1800
1801        # Walk up tmpdir, deleting as we go
1802        for path, dirs, files in os.walk(tmpdir, topdown=False):
1803            for f in files:
1804                os.remove(os.path.join(path, f))
1805            for d in dirs:
1806                os.rmdir(os.path.join(path, d))
1807        os.rmdir(tmpdir)
1808
1809        # The deepcopy prevents the allocation ID and other binaries from being
1810        # translated into other formats
1811        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1812                for tb in tbparams.keys() \
1813                    if tbparams[tb].has_key('federant') ],\
1814                    'vtopo': vtopo,\
1815                    'vis' : vis,
1816                    'experimentID' : [\
1817                            { 'fedid': copy.copy(expid) }, \
1818                            { 'localname': eid },\
1819                        ],\
1820                    'experimentAccess': { 'X509' : expcert },\
1821                }
1822        # remove the allocationID info from each federant
1823        for f in resp['federant']:
1824            if f.has_key('allocID'): del f['allocID']
1825
1826        # Insert the experiment into our state and update the disk copy
1827        self.state_lock.acquire()
1828        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1829                for tb in tbparams.keys() \
1830                    if tbparams[tb].has_key('federant') ],\
1831                    'vtopo': vtopo,\
1832                    'vis' : vis,
1833                    'owner': fid,
1834                    'experimentID' : [\
1835                            { 'fedid': expid }, { 'localname': eid },\
1836                        ],\
1837                }
1838        self.state[eid] = self.state[expid]
1839        if self.state_filename: self.write_state()
1840        self.state_lock.release()
1841
1842        self.auth.set_attribute(fid, expid)
1843        self.auth.set_attribute(expid, expid)
1844
1845        if not failed:
1846            return resp
1847        else:
1848            raise service_error(service_error.partial, \
1849                    "Partial swap in on %s" % ",".join(succeeded))
1850
1851    def check_experiment_access(self, fid, key):
1852        """
1853        Confirm that the fid has access to the experiment.  Though a request
1854        may be made in terms of a local name, the access attribute is always
1855        the experiment's fedid.
1856        """
1857        if not isinstance(key, fedid):
1858            self.state_lock.acquire()
1859            if self.state.has_key(key):
1860                try:
1861                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1862                            if f.has_key('fedid') ]
1863                except KeyError:
1864                    self.state_lock.release()
1865                    raise service_error(service_error.internal, 
1866                            "No fedid for experiment %s when checking " +\
1867                                    "access(!?)" % key)
1868                if len(kl) == 1:
1869                    key = kl[0]
1870                else:
1871                    self.state_lock.release()
1872                    raise service_error(service_error.internal, 
1873                            "multiple fedids for experiment %s when " +\
1874                                    "checking access(!?)" % key)
1875            else:
1876                self.state_lock.release()
1877                raise service_error(service_error.access, "Access Denied")
1878            self.state_lock.release()
1879
1880        if self.auth.check_attribute(fid, key):
1881            return True
1882        else:
1883            raise service_error(service_error.access, "Access Denied")
1884
1885
1886
1887    def get_vtopo(self, req, fid):
1888        """
1889        Return the stored virtual topology for this experiment
1890        """
1891        rv = None
1892
1893        req = req.get('VtopoRequestBody', None)
1894        if not req:
1895            raise service_error(service_error.req,
1896                    "Bad request format (no VtopoRequestBody)")
1897        exp = req.get('experiment', None)
1898        if exp:
1899            if exp.has_key('fedid'):
1900                key = exp['fedid']
1901                keytype = "fedid"
1902            elif exp.has_key('localname'):
1903                key = exp['localname']
1904                keytype = "localname"
1905            else:
1906                raise service_error(service_error.req, "Unknown lookup type")
1907        else:
1908            raise service_error(service_error.req, "No request?")
1909
1910        self.check_experiment_access(fid, key)
1911
1912        self.state_lock.acquire()
1913        if self.state.has_key(key):
1914            rv = { 'experiment' : {keytype: key },\
1915                    'vtopo': self.state[key]['vtopo'],\
1916                }
1917        self.state_lock.release()
1918
1919        if rv: return rv
1920        else: raise service_error(service_error.req, "No such experiment")
1921
1922    def get_vis(self, req, fid):
1923        """
1924        Return the stored visualization for this experiment
1925        """
1926        rv = None
1927
1928        req = req.get('VisRequestBody', None)
1929        if not req:
1930            raise service_error(service_error.req,
1931                    "Bad request format (no VisRequestBody)")
1932        exp = req.get('experiment', None)
1933        if exp:
1934            if exp.has_key('fedid'):
1935                key = exp['fedid']
1936                keytype = "fedid"
1937            elif exp.has_key('localname'):
1938                key = exp['localname']
1939                keytype = "localname"
1940            else:
1941                raise service_error(service_error.req, "Unknown lookup type")
1942        else:
1943            raise service_error(service_error.req, "No request?")
1944
1945        self.check_experiment_access(fid, key)
1946
1947        self.state_lock.acquire()
1948        if self.state.has_key(key):
1949            rv =  { 'experiment' : {keytype: key },\
1950                    'vis': self.state[key]['vis'],\
1951                    }
1952        self.state_lock.release()
1953
1954        if rv: return rv
1955        else: raise service_error(service_error.req, "No such experiment")
1956
1957    def get_info(self, req, fid):
1958        """
1959        Return all the stored info about this experiment
1960        """
1961        rv = None
1962
1963        req = req.get('InfoRequestBody', None)
1964        if not req:
1965            raise service_error(service_error.req,
1966                    "Bad request format (no VisRequestBody)")
1967        exp = req.get('experiment', None)
1968        if exp:
1969            if exp.has_key('fedid'):
1970                key = exp['fedid']
1971                keytype = "fedid"
1972            elif exp.has_key('localname'):
1973                key = exp['localname']
1974                keytype = "localname"
1975            else:
1976                raise service_error(service_error.req, "Unknown lookup type")
1977        else:
1978            raise service_error(service_error.req, "No request?")
1979
1980        self.check_experiment_access(fid, key)
1981
1982        # The state may be massaged by the service function that called
1983        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1984        # state.
1985        self.state_lock.acquire()
1986        if self.state.has_key(key):
1987            rv = copy.deepcopy(self.state[key])
1988        self.state_lock.release()
1989        # Remove the owner info
1990        del rv['owner']
1991        # remove the allocationID info from each federant
1992        for f in rv['federant']:
1993            if f.has_key('allocID'): del f['allocID']
1994
1995        if rv: return rv
1996        else: raise service_error(service_error.req, "No such experiment")
1997
1998
1999    def terminate_experiment(self, req, fid):
2000        """
2001        Swap this experiment out on the federants and delete the shared
2002        information
2003        """
2004        tbparams = { }
2005        req = req.get('TerminateRequestBody', None)
2006        if not req:
2007            raise service_error(service_error.req,
2008                    "Bad request format (no TerminateRequestBody)")
2009        exp = req.get('experiment', None)
2010        if exp:
2011            if exp.has_key('fedid'):
2012                key = exp['fedid']
2013                keytype = "fedid"
2014            elif exp.has_key('localname'):
2015                key = exp['localname']
2016                keytype = "localname"
2017            else:
2018                raise service_error(service_error.req, "Unknown lookup type")
2019        else:
2020            raise service_error(service_error.req, "No request?")
2021
2022        self.check_experiment_access(fid, key)
2023
2024        self.state_lock.acquire()
2025        fed_exp = self.state.get(key, None)
2026
2027        if fed_exp:
2028            # This branch of the conditional holds the lock to generate a
2029            # consistent temporary tbparams variable to deallocate experiments.
2030            # It releases the lock to do the deallocations and reacquires it to
2031            # remove the experiment state when the termination is complete.
2032            ids = []
2033            #  experimentID is a list of dicts that are self-describing
2034            #  identifiers.  This finds all the fedids and localnames - the
2035            #  keys of self.state - and puts them into ids.
2036            for id in fed_exp.get('experimentID', []):
2037                if id.has_key('fedid'): ids.append(id['fedid'])
2038                if id.has_key('localname'): ids.append(id['localname'])
2039
2040            # Construct enough of the tbparams to make the stop_segment calls
2041            # work
2042            for fed in fed_exp['federant']:
2043                try:
2044                    for e in fed['name']:
2045                        eid = e.get('localname', None)
2046                        if eid: break
2047                    else:
2048                        continue
2049
2050                    p = fed['emulab']['project']
2051
2052                    project = p['name']['localname']
2053                    tb = p['testbed']['localname']
2054                    user = p['user'][0]['userID']['localname']
2055
2056                    domain = fed['emulab']['domain']
2057                    host  = fed['emulab']['ops']
2058                    aid = fed['allocID']
2059                except KeyError, e:
2060                    continue
2061                tbparams[tb] = {\
2062                        'user': user,\
2063                        'domain': domain,\
2064                        'project': project,\
2065                        'host': host,\
2066                        'eid': eid,\
2067                        'aid': aid,\
2068                    }
2069            self.state_lock.release()
2070
2071            # Stop everyone.
2072            thread_pool = self.thread_pool(self.nthreads)
2073            for tb in tbparams.keys():
2074                # Create and start a thread to stop the segment
2075                thread_pool.wait_for_slot()
2076                t  = self.pooled_thread(target=self.stop_segment, 
2077                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2078                        pdata=thread_pool, trace_file=self.trace_file)
2079                t.start()
2080            # Wait for completions
2081            thread_pool.wait_for_all_done()
2082
2083            # release the allocations
2084            for tb in tbparams.keys():
2085                self.release_access(tb, tbparams[tb]['aid'])
2086
2087            # Remove the terminated experiment
2088            self.state_lock.acquire()
2089            for id in ids:
2090                if self.state.has_key(id): del self.state[id]
2091
2092            if self.state_filename: self.write_state()
2093            self.state_lock.release()
2094
2095            return { 'experiment': exp }
2096        else:
2097            # Don't forget to release the lock
2098            self.state_lock.release()
2099            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.