source: fedd/federation/experiment_control.py @ 4450b24

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 4450b24 was 7c3d547, checked in by Ted Faber <faber@…>, 16 years ago

add a line to the client config file telling the master experiment ID

  • Property mode set to 100644
File size: 63.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self, nthreads):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53            self.nthreads = nthreads
54
55        def acquire(self):
56            """
57            Get the pool's lock.
58            """
59            self.changed.acquire()
60
61        def release(self):
62            """
63            Release the pool's lock.
64            """
65            self.changed.release()
66
67        def wait(self, timeout = None):
68            """
69            Wait for a pool thread to start or stop.
70            """
71            self.changed.wait(timeout)
72
73        def start(self):
74            """
75            Called by a pool thread to report starting.
76            """
77            self.changed.acquire()
78            self.started += 1
79            self.changed.notifyAll()
80            self.changed.release()
81
82        def terminate(self):
83            """
84            Called by a pool thread to report finishing.
85            """
86            self.changed.acquire()
87            self.terminated += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def clear(self):
92            """
93            Clear all pool data.
94            """
95            self.changed.acquire()
96            self.started = 0
97            self.terminated =0
98            self.changed.notifyAll()
99            self.changed.release()
100
101        def wait_for_slot(self):
102            """
103            Wait until we have a free slot to start another pooled thread
104            """
105            self.acquire()
106            while self.started - self.terminated >= self.nthreads:
107                self.wait()
108            self.release()
109
110        def wait_for_all_done(self):
111            """
112            Wait until all active threads finish (and at least one has started)
113            """
114            self.acquire()
115            while self.started == 0 or self.started > self.terminated:
116                self.wait()
117            self.release()
118
119    class pooled_thread(Thread):
120        """
121        One of a set of threads dedicated to a specific task.  Uses the
122        thread_pool class above for coordination.
123        """
124        def __init__(self, group=None, target=None, name=None, args=(), 
125                kwargs={}, pdata=None, trace_file=None):
126            Thread.__init__(self, group, target, name, args, kwargs)
127            self.rv = None          # Return value of the ops in this thread
128            self.exception = None   # Exception that terminated this thread
129            self.target=target      # Target function to run on start()
130            self.args = args        # Args to pass to target
131            self.kwargs = kwargs    # Additional kw args
132            self.pdata = pdata      # thread_pool for this class
133            # Logger for this thread
134            self.log = logging.getLogger("fedd.experiment_control")
135       
136        def run(self):
137            """
138            Emulate Thread.run, except add pool data manipulation and error
139            logging.
140            """
141            if self.pdata:
142                self.pdata.start()
143
144            if self.target:
145                try:
146                    self.rv = self.target(*self.args, **self.kwargs)
147                except service_error, s:
148                    self.exception = s
149                    self.log.error("Thread exception: %s %s" % \
150                            (s.code_string(), s.desc))
151                except:
152                    self.exception = sys.exc_info()[1]
153                    self.log.error(("Unexpected thread exception: %s" +\
154                            "Trace %s") % (self.exception,\
155                                traceback.format_exc()))
156            if self.pdata:
157                self.pdata.terminate()
158
159    call_RequestAccess = service_caller('RequestAccess')
160    call_ReleaseAccess = service_caller('ReleaseAccess')
161    call_Ns2Split = service_caller('Ns2Split')
162
163    def __init__(self, config=None, auth=None):
164        """
165        Intialize the various attributes, most from the config object
166        """
167        self.thread_with_rv = experiment_control_local.pooled_thread
168        self.thread_pool = experiment_control_local.thread_pool
169
170        self.cert_file = config.get("experiment_control", "cert_file")
171        if self.cert_file:
172            self.cert_pwd = config.get("experiment_control", "cert_pwd")
173        else:
174            self.cert_file = config.get("globals", "cert_file")
175            self.cert_pwd = config.get("globals", "cert_pwd")
176
177        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
178                or config.get("globals", "trusted_certs")
179
180        self.exp_stem = "fed-stem"
181        self.log = logging.getLogger("fedd.experiment_control")
182        set_log_level(config, "experiment_control", self.log)
183        self.muxmax = 2
184        self.nthreads = 2
185        self.randomize_experiments = False
186
187        self.scp_exec = "/usr/bin/scp"
188        self.splitter = None
189        self.ssh_exec="/usr/bin/ssh"
190        self.ssh_keygen = "/usr/bin/ssh-keygen"
191        self.ssh_identity_file = None
192
193
194        self.debug = config.getboolean("experiment_control", "create_debug")
195        self.state_filename = config.get("experiment_control", 
196                "experiment_state")
197        self.splitter_url = config.get("experiment_control", "splitter_uri")
198        self.fedkit = config.get("experiment_control", "fedkit")
199        accessdb_file = config.get("experiment_control", "accessdb")
200
201        self.ssh_pubkey_file = config.get("experiment_control", 
202                "ssh_pubkey_file")
203        self.ssh_privkey_file = config.get("experiment_control",
204                "ssh_privkey_file")
205        # NB for internal master/slave ops, not experiment setup
206        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
207        self.state = { }
208        self.state_lock = Lock()
209        self.tclsh = "/usr/local/bin/otclsh"
210        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
211                config.get("experiment_control", "tcl_splitter",
212                        "/usr/testbed/lib/ns2ir/parse.tcl")
213        mapdb_file = config.get("experiment_control", "mapdb")
214        self.trace_file = sys.stderr
215
216        self.def_expstart = \
217                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
218                "/tmp/federate";
219        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
220                "FEDDIR/hosts";
221        self.def_gwstart = \
222                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
223                "/tmp/bridge.log";
224        self.def_mgwstart = \
225                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
226                "/tmp/bridge.log";
227        self.def_gwimage = "FBSD61-TUNNEL2";
228        self.def_gwtype = "pc";
229        self.local_access = { }
230
231        if auth:
232            self.auth = auth
233        else:
234            self.log.error(\
235                    "[access]: No authorizer initialized, creating local one.")
236            auth = authorizer()
237
238
239        if self.ssh_pubkey_file:
240            try:
241                f = open(self.ssh_pubkey_file, 'r')
242                self.ssh_pubkey = f.read()
243                f.close()
244            except IOError:
245                raise service_error(service_error.internal,
246                        "Cannot read sshpubkey")
247        else:
248            raise service_error(service_error.internal, 
249                    "No SSH public key file?")
250
251        if not self.ssh_privkey_file:
252            raise service_error(service_error.internal, 
253                    "No SSH public key file?")
254
255
256        if mapdb_file:
257            self.read_mapdb(mapdb_file)
258        else:
259            self.log.warn("[experiment_control] No testbed map, using defaults")
260            self.tbmap = { 
261                    'deter':'https://users.isi.deterlab.net:23235',
262                    'emulab':'https://users.isi.deterlab.net:23236',
263                    'ucb':'https://users.isi.deterlab.net:23237',
264                    }
265
266        if accessdb_file:
267                self.read_accessdb(accessdb_file)
268        else:
269            raise service_error(service_error.internal,
270                    "No accessdb specified in config")
271
272        # Grab saved state.  OK to do this w/o locking because it's read only
273        # and only one thread should be in existence that can see self.state at
274        # this point.
275        if self.state_filename:
276            self.read_state()
277
278        # Dispatch tables
279        self.soap_services = {\
280                'Create': soap_handler('Create', self.create_experiment),
281                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
282                'Vis': soap_handler('Vis', self.get_vis),
283                'Info': soap_handler('Info', self.get_info),
284                'Terminate': soap_handler('Terminate', 
285                    self.terminate_experiment),
286        }
287
288        self.xmlrpc_services = {\
289                'Create': xmlrpc_handler('Create', self.create_experiment),
290                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
291                'Vis': xmlrpc_handler('Vis', self.get_vis),
292                'Info': xmlrpc_handler('Info', self.get_info),
293                'Terminate': xmlrpc_handler('Terminate',
294                    self.terminate_experiment),
295        }
296
297    def copy_file(self, src, dest, size=1024):
298        """
299        Exceedingly simple file copy.
300        """
301        s = open(src,'r')
302        d = open(dest, 'w')
303
304        buf = "x"
305        while buf != "":
306            buf = s.read(size)
307            d.write(buf)
308        s.close()
309        d.close()
310
311    # Call while holding self.state_lock
312    def write_state(self):
313        """
314        Write a new copy of experiment state after copying the existing state
315        to a backup.
316
317        State format is a simple pickling of the state dictionary.
318        """
319        if os.access(self.state_filename, os.W_OK):
320            self.copy_file(self.state_filename, \
321                    "%s.bak" % self.state_filename)
322        try:
323            f = open(self.state_filename, 'w')
324            pickle.dump(self.state, f)
325        except IOError, e:
326            self.log.error("Can't write file %s: %s" % \
327                    (self.state_filename, e))
328        except pickle.PicklingError, e:
329            self.log.error("Pickling problem: %s" % e)
330        except TypeError, e:
331            self.log.error("Pickling problem (TypeError): %s" % e)
332
333    # Call while holding self.state_lock
334    def read_state(self):
335        """
336        Read a new copy of experiment state.  Old state is overwritten.
337
338        State format is a simple pickling of the state dictionary.
339        """
340        try:
341            f = open(self.state_filename, "r")
342            self.state = pickle.load(f)
343            self.log.debug("[read_state]: Read state from %s" % \
344                    self.state_filename)
345        except IOError, e:
346            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
347                    % (self.state_filename, e))
348        except pickle.UnpicklingError, e:
349            self.log.warning(("[read_state]: No saved state: " + \
350                    "Unpickling failed: %s") % e)
351       
352        for k in self.state.keys():
353            try:
354                # This list should only have one element in it, but phrasing it
355                # as a for loop doesn't cost much, really.  We have to find the
356                # fedid elements anyway.
357                for eid in [ f['fedid'] \
358                        for f in self.state[k]['experimentID']\
359                            if f.has_key('fedid') ]:
360                    self.auth.set_attribute(self.state[k]['owner'], eid)
361            except KeyError, e:
362                self.log.warning("[read_state]: State ownership or identity " +\
363                        "misformatted in %s: %s" % (self.state_filename, e))
364
365
366    def read_accessdb(self, accessdb_file):
367        """
368        Read the mapping from fedids that can create experiments to their name
369        in the 3-level access namespace.  All will be asserted from this
370        testbed and can include the local username and porject that will be
371        asserted on their behalf by this fedd.  Each fedid is also added to the
372        authorization system with the "create" attribute.
373        """
374        self.accessdb = {}
375        # These are the regexps for parsing the db
376        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
377        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
378                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
379        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
380                "\s*->\s*(" + name_expr + ")\s*$")
381        lineno = 0
382
383        # Parse the mappings and store in self.authdb, a dict of
384        # fedid -> (proj, user)
385        try:
386            f = open(accessdb_file, "r")
387            for line in f:
388                lineno += 1
389                line = line.strip()
390                if len(line) == 0 or line.startswith('#'):
391                    continue
392                m = project_line.match(line)
393                if m:
394                    fid = fedid(hexstr=m.group(1))
395                    project, user = m.group(2,3)
396                    if not self.accessdb.has_key(fid):
397                        self.accessdb[fid] = []
398                    self.accessdb[fid].append((project, user))
399                    continue
400
401                m = user_line.match(line)
402                if m:
403                    fid = fedid(hexstr=m.group(1))
404                    project = None
405                    user = m.group(2)
406                    if not self.accessdb.has_key(fid):
407                        self.accessdb[fid] = []
408                    self.accessdb[fid].append((project, user))
409                    continue
410                self.log.warn("[experiment_control] Error parsing access " +\
411                        "db %s at line %d" %  (accessdb_file, lineno))
412        except IOError:
413            raise service_error(service_error.internal,
414                    "Error opening/reading %s as experiment " +\
415                            "control accessdb" %  accessdb_file)
416        f.close()
417
418        # Initialize the authorization attributes
419        for fid in self.accessdb.keys():
420            self.auth.set_attribute(fid, 'create')
421
422    def read_mapdb(self, file):
423        """
424        Read a simple colon separated list of mappings for the
425        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
426        """
427
428        self.tbmap = { }
429        lineno =0
430        try:
431            f = open(file, "r")
432            for line in f:
433                lineno += 1
434                line = line.strip()
435                if line.startswith('#') or len(line) == 0:
436                    continue
437                try:
438                    label, url = line.split(':', 1)
439                    self.tbmap[label] = url
440                except ValueError, e:
441                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
442                            "map db: %s %s" % (lineno, line, e))
443        except IOError, e:
444            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
445                    "open %s: %s" % (file, e))
446        f.close()
447
448    def scp_file(self, file, user, host, dest=""):
449        """
450        scp a file to the remote host.  If debug is set the action is only
451        logged.
452        """
453
454        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
455                '-o', 'StrictHostKeyChecking yes', '-i', 
456                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
457        rv = 0
458
459        try:
460            dnull = open("/dev/null", "w")
461        except IOError:
462            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
463            dnull = Null
464
465        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
466        if not self.debug:
467            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
468
469        return rv == 0
470
471    def ssh_cmd(self, user, host, cmd, wname=None):
472        """
473        Run a remote command on host as user.  If debug is set, the action is
474        only logged.
475        """
476        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
477                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
478                (self.ssh_exec, self.ssh_privkey_file, 
479                        user, host, cmd)
480
481        try:
482            dnull = open("/dev/null", "r")
483        except IOError:
484            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
485            dnull = Null
486
487        self.log.debug("[ssh_cmd]: %s" % sh_str)
488        if not self.debug:
489            if dnull:
490                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
491            else:
492                sub = Popen(sh_str, shell=True)
493            return sub.wait() == 0
494        else:
495            return True
496
497    def ship_configs(self, host, user, src_dir, dest_dir):
498        """
499        Copy federant-specific configuration files to the federant.
500        """
501        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
502            return False
503        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
504            return False
505
506        for f in os.listdir(src_dir):
507            if os.path.isdir(f):
508                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
509                        "%s/%s" % (dest_dir, f)):
510                    return False
511            else:
512                if not self.scp_file("%s/%s" % (src_dir, f), 
513                        user, host, dest_dir):
514                    return False
515        return True
516
517    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
518        """
519        Start a sub-experiment on a federant.
520
521        Get the current state, modify or create as appropriate, ship data and
522        configs and start the experiment.  There are small ordering differences
523        based on the initial state of the sub-experiment.
524        """
525        # ops node in the federant
526        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
527        user = tbparams[tb]['user']     # federant user
528        pid = tbparams[tb]['project']   # federant project
529        # XXX
530        base_confs = ( "hosts",)
531        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
532        # command to test experiment state
533        expinfo_exec = "/usr/testbed/bin/expinfo" 
534        # Configuration directories on the remote machine
535        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
536        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
537        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
538        # Regular expressions to parse the expinfo response
539        state_re = re.compile("State:\s+(\w+)")
540        no_exp_re = re.compile("^No\s+such\s+experiment")
541        state = None    # Experiment state parsed from expinfo
542        # The expinfo ssh command.  Note the identity restriction to use only
543        # the identity provided in the pubkey given.
544        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
545                'StrictHostKeyChecking yes', '-i', 
546                self.ssh_privkey_file, "%s@%s" % (user, host), 
547                expinfo_exec, pid, eid]
548
549        # Get status
550        self.log.debug("[start_segment]: %s"% " ".join(cmd))
551        dev_null = None
552        try:
553            dev_null = open("/dev/null", "a")
554        except IOError, e:
555            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
556
557        if self.debug:
558            state = 'swapped'
559            rv = 0
560        else:
561            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
562            for line in status.stdout:
563                m = state_re.match(line)
564                if m: state = m.group(1)
565                else:
566                    m = no_exp_re.match(line)
567                    if m: state = "none"
568            rv = status.wait()
569
570        # If the experiment is not present the subcommand returns a non-zero
571        # return value.  If we successfully parsed a "none" outcome, ignore the
572        # return code.
573        if rv != 0 and state != "none":
574            raise service_error(service_error.internal,
575                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
576
577        self.log.debug("[start_segment]: %s: %s" % (tb, state))
578        self.log.info("[start_segment]:transferring experiment to %s" % tb)
579
580        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
581            return False
582        # Clear the federation config dirs
583        if not self.ssh_cmd(user, host, 
584                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
585            return False
586        # Clear and create the tarfiles and rpm directories
587        for d in (tarfiles_dir, rpms_dir):
588            if not self.ssh_cmd(user, host, 
589                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
590                return False
591            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
592                    "create tarfiles"):
593                return False
594       
595        if state == 'active':
596            # Create the federation config dirs (do not move outside the
597            # conditional.  Happens later in new expriment creation)
598            if not self.ssh_cmd(user, host, 
599                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
600                return False
601            # Remote experiment is active.  Modify it.
602            for f in base_confs:
603                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
604                        "%s/%s" % (proj_dir, f)):
605                    return False
606            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
607                    proj_dir):
608                return False
609            if os.path.isdir("%s/tarfiles" % tmpdir):
610                if not self.ship_configs(host, user,
611                        "%s/tarfiles" % tmpdir, tarfiles_dir):
612                    return False
613            if os.path.isdir("%s/rpms" % tmpdir):
614                if not self.ship_configs(host, user,
615                        "%s/rpms" % tmpdir, tarfiles_dir):
616                    return False
617            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
618            if not self.ssh_cmd(user, host,
619                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
620                            (pid, eid, tclfile), "modexp"):
621                return False
622            return True
623        elif state == "swapped":
624            # Create the federation config dirs (do not move outside the
625            # conditional.  Happens later in new expriment creation)
626            if not self.ssh_cmd(user, host, 
627                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
628                return False
629            # Remote experiment swapped out.  Modify it and swap it in.
630            for f in base_confs:
631                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
632                        "%s/%s" % (proj_dir, f)):
633                    return False
634            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
635                    proj_dir):
636                return False
637            if os.path.isdir("%s/tarfiles" % tmpdir):
638                if not self.ship_configs(host, user,
639                        "%s/tarfiles" % tmpdir, tarfiles_dir):
640                    return False
641            if os.path.isdir("%s/rpms" % tmpdir):
642                if not self.ship_configs(host, user,
643                        "%s/rpms" % tmpdir, tarfiles_dir):
644                    return False
645            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
646            if not self.ssh_cmd(user, host,
647                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
648                    "modexp"):
649                return False
650            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
651            if not self.ssh_cmd(user, host,
652                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
653                    "swapexp"):
654                return False
655            return True
656        elif state == "none":
657            # No remote experiment.  Create one.  We do this in 2 steps so we
658            # can put the configuration files and scripts into the new
659            # experiment directories.
660
661            # Tarfiles must be present for creation to work
662            if os.path.isdir("%s/tarfiles" % tmpdir):
663                if not self.ship_configs(host, user,
664                        "%s/tarfiles" % tmpdir, tarfiles_dir):
665                    return False
666            if os.path.isdir("%s/rpms" % tmpdir):
667                if not self.ship_configs(host, user,
668                        "%s/rpms" % tmpdir, tarfiles_dir):
669                    return False
670            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
671            if not self.ssh_cmd(user, host,
672                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
673                            (pid, eid, tclfile), "startexp"):
674                return False
675            # Create the federation config dirs (do not move outside the
676            # conditional.)
677            if not self.ssh_cmd(user, host, 
678                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
679                return False
680            # After startexp the per-experiment directories exist
681            for f in base_confs:
682                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
683                        "%s/%s" % (proj_dir, f)):
684                    return False
685            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
686                    proj_dir):
687                return False
688            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
689            if not self.ssh_cmd(user, host,
690                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
691                    "swapexp"):
692                return False
693            return True
694        else:
695            self.log.debug("[start_segment]:unknown state %s" % state)
696            return False
697
698    def stop_segment(self, tb, eid, tbparams):
699        """
700        Stop a sub experiment by calling swapexp on the federant
701        """
702        user = tbparams[tb]['user']
703        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
704        pid = tbparams[tb]['project']
705
706        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
707        return self.ssh_cmd(user, host,
708                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
709
710       
711    def generate_ssh_keys(self, dest, type="rsa" ):
712        """
713        Generate a set of keys for the gateways to use to talk.
714
715        Keys are of type type and are stored in the required dest file.
716        """
717        valid_types = ("rsa", "dsa")
718        t = type.lower();
719        if t not in valid_types: raise ValueError
720        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
721
722        try:
723            trace = open("/dev/null", "w")
724        except IOError:
725            raise service_error(service_error.internal,
726                    "Cannot open /dev/null??");
727
728        # May raise CalledProcessError
729        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
730        rv = call(cmd, stdout=trace, stderr=trace)
731        if rv != 0:
732            raise service_error(service_error.internal, 
733                    "Cannot generate nonce ssh keys.  %s return code %d" \
734                            % (self.ssh_keygen, rv))
735
736    def gentopo(self, str):
737        """
738        Generate the topology dtat structure from the splitter's XML
739        representation of it.
740
741        The topology XML looks like:
742            <experiment>
743                <nodes>
744                    <node><vname></vname><ips>ip1:ip2</ips></node>
745                </nodes>
746                <lans>
747                    <lan>
748                        <vname></vname><vnode></vnode><ip></ip>
749                        <bandwidth></bandwidth><member>node:port</member>
750                    </lan>
751                </lans>
752        """
753        class topo_parse:
754            """
755            Parse the topology XML and create the dats structure.
756            """
757            def __init__(self):
758                # Typing of the subelements for data conversion
759                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
760                self.int_subelements = ( 'bandwidth',)
761                self.float_subelements = ( 'delay',)
762                # The final data structure
763                self.nodes = [ ]
764                self.lans =  [ ]
765                self.topo = { \
766                        'node': self.nodes,\
767                        'lan' : self.lans,\
768                    }
769                self.element = { }  # Current element being created
770                self.chars = ""     # Last text seen
771
772            def end_element(self, name):
773                # After each sub element the contents is added to the current
774                # element or to the appropriate list.
775                if name == 'node':
776                    self.nodes.append(self.element)
777                    self.element = { }
778                elif name == 'lan':
779                    self.lans.append(self.element)
780                    self.element = { }
781                elif name in self.str_subelements:
782                    self.element[name] = self.chars
783                    self.chars = ""
784                elif name in self.int_subelements:
785                    self.element[name] = int(self.chars)
786                    self.chars = ""
787                elif name in self.float_subelements:
788                    self.element[name] = float(self.chars)
789                    self.chars = ""
790
791            def found_chars(self, data):
792                self.chars += data.rstrip()
793
794
795        tp = topo_parse();
796        parser = xml.parsers.expat.ParserCreate()
797        parser.EndElementHandler = tp.end_element
798        parser.CharacterDataHandler = tp.found_chars
799
800        parser.Parse(str)
801
802        return tp.topo
803       
804
805    def genviz(self, topo):
806        """
807        Generate the visualization the virtual topology
808        """
809
810        neato = "/usr/local/bin/neato"
811        # These are used to parse neato output and to create the visualization
812        # file.
813        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
814        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
815                "%s</type></node>"
816
817        try:
818            # Node names
819            nodes = [ n['vname'] for n in topo['node'] ]
820            topo_lans = topo['lan']
821        except KeyError:
822            raise service_error(service_error.internal, "Bad topology")
823
824        lans = { }
825        links = { }
826
827        # Walk through the virtual topology, organizing the connections into
828        # 2-node connections (links) and more-than-2-node connections (lans).
829        # When a lan is created, it's added to the list of nodes (there's a
830        # node in the visualization for the lan).
831        for l in topo_lans:
832            if links.has_key(l['vname']):
833                if len(links[l['vname']]) < 2:
834                    links[l['vname']].append(l['vnode'])
835                else:
836                    nodes.append(l['vname'])
837                    lans[l['vname']] = links[l['vname']]
838                    del links[l['vname']]
839                    lans[l['vname']].append(l['vnode'])
840            elif lans.has_key(l['vname']):
841                lans[l['vname']].append(l['vnode'])
842            else:
843                links[l['vname']] = [ l['vnode'] ]
844
845
846        # Open up a temporary file for dot to turn into a visualization
847        try:
848            df, dotname = tempfile.mkstemp()
849            dotfile = os.fdopen(df, 'w')
850        except IOError:
851            raise service_error(service_error.internal,
852                    "Failed to open file in genviz")
853
854        # Generate a dot/neato input file from the links, nodes and lans
855        try:
856            print >>dotfile, "graph G {"
857            for n in nodes:
858                print >>dotfile, '\t"%s"' % n
859            for l in links.keys():
860                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
861            for l in lans.keys():
862                for n in lans[l]:
863                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
864            print >>dotfile, "}"
865            dotfile.close()
866        except TypeError:
867            raise service_error(service_error.internal,
868                    "Single endpoint link in vtopo")
869        except IOError:
870            raise service_error(service_error.internal, "Cannot write dot file")
871
872        # Use dot to create a visualization
873        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
874                '-Gpack=true', dotname], stdout=PIPE)
875
876        # Translate dot to vis format
877        vis_nodes = [ ]
878        vis = { 'node': vis_nodes }
879        for line in dot.stdout:
880            m = vis_re.match(line)
881            if m:
882                vn = m.group(1)
883                vis_node = {'name': vn, \
884                        'x': float(m.group(2)),\
885                        'y' : float(m.group(3)),\
886                    }
887                if vn in links.keys() or vn in lans.keys():
888                    vis_node['type'] = 'lan'
889                else:
890                    vis_node['type'] = 'node'
891                vis_nodes.append(vis_node)
892        rv = dot.wait()
893
894        os.remove(dotname)
895        if rv == 0 : return vis
896        else: return None
897
898    def get_access(self, tb, nodes, user, tbparam, master, export_project,
899            access_user):
900        """
901        Get access to testbed through fedd and set the parameters for that tb
902        """
903
904        # XXX This is needless complexity.  It must vanish.
905        translate_attr = {
906            'slavenodestartcmd': 'expstart',
907            'slaveconnectorstartcmd': 'gwstart',
908            'masternodestartcmd': 'mexpstart',
909            'masterconnectorstartcmd': 'mgwstart',
910            'connectorimage': 'gwimage',
911            'connectortype': 'gwtype',
912            'tunnelcfg': 'tun',
913            'tunnelinterface': 'tunnelinterface',
914            'smbshare': 'smbshare',
915            'slaveconnectorcmd': 'gwcmd',
916            'slaveconnectorcmdparams': 'gwcmdparams',
917            'masterconnectorcmd': 'mgwcmd',
918            'masterconnectorcmdparams': 'mgwcmdparams',
919        }
920
921        uri = self.tbmap.get(tb, None)
922        if not uri:
923            raise service_error(serice_error.server_config, 
924                    "Unknown testbed: %s" % tb)
925
926        # currently this lumps all users into one service access group
927        service_keys = [ a for u in user \
928                for a in u.get('access', []) \
929                    if a.has_key('sshPubkey')]
930
931        if len(service_keys) == 0:
932            raise service_error(service_error.req, 
933                    "Must have at least one SSH pubkey for services")
934
935
936        for p, u in access_user:
937            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
938                    "to %s") %  ((p or "None"), u, uri))
939
940            if p:
941                # Request with user and project specified
942                req = {\
943                        'destinationTestbed' : { 'uri' : uri },
944                        'project': { 
945                            'name': {'localname': p},
946                            'user': [ {'userID': { 'localname': u } } ],
947                            },
948                        'user':  user,
949                        'allocID' : { 'localname': 'test' },
950                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
951                        'serviceAccess' : service_keys
952                    }
953            else:
954                # Request with only user specified
955                req = {\
956                        'destinationTestbed' : { 'uri' : uri },
957                        'user':  [ {'userID': { 'localname': u } } ],
958                        'allocID' : { 'localname': 'test' },
959                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
960                        'serviceAccess' : service_keys
961                    }
962
963            if tb == master:
964                # NB, the export_project parameter is a dict that includes
965                # the type
966                req['exportProject'] = export_project
967
968            # node resources if any
969            if nodes != None and len(nodes) > 0:
970                rnodes = [ ]
971                for n in nodes:
972                    rn = { }
973                    image, hw, count = n.split(":")
974                    if image: rn['image'] = [ image ]
975                    if hw: rn['hardware'] = [ hw ]
976                    if count and int(count) >0 : rn['count'] = int(count)
977                    rnodes.append(rn)
978                req['resources']= { }
979                req['resources']['node'] = rnodes
980
981            try:
982                if self.local_access.has_key(uri):
983                    # Local access call
984                    req = { 'RequestAccessRequestBody' : req }
985                    r = self.local_access[uri].RequestAccess(req, 
986                            fedid(file=self.cert_file))
987                    r = { 'RequestAccessResponseBody' : r }
988                else:
989                    r = self.call_RequestAccess(uri, req, 
990                            self.cert_file, self.cert_pwd, self.trusted_certs)
991            except service_error, e:
992                if e.code == service_error.access:
993                    self.log.debug("[get_access] Access denied")
994                    r = None
995                    continue
996                else:
997                    raise e
998
999            if r.has_key('RequestAccessResponseBody'):
1000                # Through to here we have a valid response, not a fault.
1001                # Access denied is a fault, so something better or worse than
1002                # access denied has happened.
1003                r = r['RequestAccessResponseBody']
1004                self.log.debug("[get_access] Access granted")
1005                break
1006            else:
1007                raise service_error(service_error.protocol,
1008                        "Bad proxy response")
1009       
1010        if not r:
1011            raise service_error(service_error.access, 
1012                    "Access denied by %s (%s)" % (tb, uri))
1013
1014        e = r['emulab']
1015        p = e['project']
1016        tbparam[tb] = { 
1017                "boss": e['boss'],
1018                "host": e['ops'],
1019                "domain": e['domain'],
1020                "fs": e['fileServer'],
1021                "eventserver": e['eventServer'],
1022                "project": unpack_id(p['name']),
1023                "emulab" : e,
1024                "allocID" : r['allocID'],
1025                }
1026        # Make the testbed name be the label the user applied
1027        p['testbed'] = {'localname': tb }
1028
1029        for u in p['user']:
1030            role = u.get('role', None)
1031            if role == 'experimentCreation':
1032                tbparam[tb]['user'] = unpack_id(u['userID'])
1033                break
1034        else:
1035            raise service_error(service_error.internal, 
1036                    "No createExperimentUser from %s" %tb)
1037
1038        for a in e['fedAttr']:
1039            if a['attribute']:
1040                key = translate_attr.get(a['attribute'].lower(), None)
1041                if key:
1042                    tbparam[tb][key]= a['value']
1043       
1044    def release_access(self, tb, aid):
1045        """
1046        Release access to testbed through fedd
1047        """
1048
1049        uri = self.tbmap.get(tb, None)
1050        if not uri:
1051            raise service_error(serice_error.server_config, 
1052                    "Unknown testbed: %s" % tb)
1053
1054        if self.local_access.has_key(uri):
1055            resp = self.local_access[uri].ReleaseAccess(\
1056                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1057                    fedid(file=self.cert_file))
1058            resp = { 'ReleaseAccessResponseBody': resp } 
1059        else:
1060            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1061                    self.cert_file, self.cert_pwd, self.trusted_certs)
1062
1063        # better error coding
1064
1065    def remote_splitter(self, uri, desc, master):
1066
1067        req = {
1068                'description' : { 'ns2description': desc },
1069                'master': master,
1070                'include_fedkit': bool(self.fedkit)
1071            }
1072
1073        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1074                self.trusted_certs)
1075
1076        if r.has_key('Ns2SplitResponseBody'):
1077            r = r['Ns2SplitResponseBody']
1078            if r.has_key('output'):
1079                return r['output'].splitlines()
1080            else:
1081                raise service_error(service_error.protocol, 
1082                        "Bad splitter response (no output)")
1083        else:
1084            raise service_error(service_error.protocol, "Bad splitter response")
1085       
1086    class current_testbed:
1087        """
1088        Object for collecting the current testbed description.  The testbed
1089        description is saved to a file with the local testbed variables
1090        subsittuted line by line.
1091        """
1092        def __init__(self, eid, tmpdir, fedkit):
1093            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1094            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1095            self.current_testbed = None
1096            self.testbed_file = None
1097
1098            self.def_expstart = \
1099                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1100            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1101            self.def_gwstart = \
1102                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1103            self.def_mgwstart = \
1104                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1105            self.def_gwimage = "FBSD61-TUNNEL2";
1106            self.def_gwtype = "pc";
1107            self.def_mgwcmd = '# '
1108            self.def_mgwcmdparams = ''
1109            self.def_gwcmd = '# '
1110            self.def_gwcmdparams = ''
1111
1112            self.eid = eid
1113            self.tmpdir = tmpdir
1114            self.fedkit = fedkit
1115
1116        def __call__(self, line, master, allocated, tbparams):
1117            # Capture testbed topology descriptions
1118            if self.current_testbed == None:
1119                m = self.begin_testbed.match(line)
1120                if m != None:
1121                    self.current_testbed = m.group(1)
1122                    if self.current_testbed == None:
1123                        raise service_error(service_error.req,
1124                                "Bad request format (unnamed testbed)")
1125                    allocated[self.current_testbed] = \
1126                            allocated.get(self.current_testbed,0) + 1
1127                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1128                    if not os.path.exists(tb_dir):
1129                        try:
1130                            os.mkdir(tb_dir)
1131                        except IOError:
1132                            raise service_error(service_error.internal,
1133                                    "Cannot create %s" % tb_dir)
1134                    try:
1135                        self.testbed_file = open("%s/%s.%s.tcl" %
1136                                (tb_dir, self.eid, self.current_testbed), 'w')
1137                    except IOError:
1138                        self.testbed_file = None
1139                    return True
1140                else: return False
1141            else:
1142                m = self.end_testbed.match(line)
1143                if m != None:
1144                    if m.group(1) != self.current_testbed:
1145                        raise service_error(service_error.internal, 
1146                                "Mismatched testbed markers!?")
1147                    if self.testbed_file != None: 
1148                        self.testbed_file.close()
1149                        self.testbed_file = None
1150                    self.current_testbed = None
1151                elif self.testbed_file:
1152                    # Substitute variables and put the line into the local
1153                    # testbed file.
1154                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1155                            self.def_gwtype)
1156                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1157                            self.def_gwimage)
1158                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1159                            self.def_mgwstart)
1160                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1161                            self.def_mexpstart)
1162                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1163                            self.def_gwstart)
1164                    expstart = tbparams[self.current_testbed].get('expstart', 
1165                            self.def_expstart)
1166                    project = tbparams[self.current_testbed].get('project')
1167                    gwcmd = tbparams[self.current_testbed].get('gwcmd', 
1168                            self.def_gwcmd)
1169                    gwcmdparams = tbparams[self.current_testbed].get(\
1170                            'gwcmdparams', self.def_gwcmdparams)
1171                    mgwcmd = tbparams[self.current_testbed].get('mgwcmd', 
1172                            self.def_gwcmd)
1173                    mgwcmdparams = tbparams[self.current_testbed].get(\
1174                            'mgwcmdparams', self.def_gwcmdparams)
1175                    line = re.sub("GWTYPE", gwtype, line)
1176                    line = re.sub("GWIMAGE", gwimage, line)
1177                    if self.current_testbed == master:
1178                        line = re.sub("GWSTART", mgwstart, line)
1179                        line = re.sub("EXPSTART", mexpstart, line)
1180                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1181                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1182                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1183                    else:
1184                        line = re.sub("GWSTART", gwstart, line)
1185                        line = re.sub("EXPSTART", expstart, line)
1186                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1187                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1188                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1189                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1190                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1191                    line = re.sub("EID", self.eid, line)
1192                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1193                            (project, self.eid), line)
1194                    if self.fedkit:
1195                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1196                                line)
1197                    print >>self.testbed_file, line
1198                return True
1199
1200    class allbeds:
1201        """
1202        Process the Allbeds section.  Get access to each federant and save the
1203        parameters in tbparams
1204        """
1205        def __init__(self, get_access):
1206            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1207            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1208            self.in_allbeds = False
1209            self.get_access = get_access
1210
1211        def __call__(self, line, user, tbparams, master, export_project,
1212                access_user):
1213            # Testbed access parameters
1214            if not self.in_allbeds:
1215                if self.begin_allbeds.match(line):
1216                    self.in_allbeds = True
1217                    return True
1218                else:
1219                    return False
1220            else:
1221                if self.end_allbeds.match(line):
1222                    self.in_allbeds = False
1223                else:
1224                    nodes = line.split('|')
1225                    tb = nodes.pop(0)
1226                    self.get_access(tb, nodes, user, tbparams, master,
1227                            export_project, access_user)
1228                return True
1229
1230    class gateways:
1231        def __init__(self, eid, master, tmpdir, gw_pubkey,
1232                gw_secretkey, copy_file, fedkit):
1233            self.begin_gateways = \
1234                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1235            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1236            self.current_gateways = None
1237            self.control_gateway = None
1238            self.active_end = { }
1239
1240            self.eid = eid
1241            self.master = master
1242            self.tmpdir = tmpdir
1243            self.gw_pubkey_base = gw_pubkey
1244            self.gw_secretkey_base = gw_secretkey
1245
1246            self.copy_file = copy_file
1247            self.fedkit = fedkit
1248
1249
1250        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1251                active_end, tbparams, dtb, myname, desthost, type):
1252            """
1253            Produce a gateway configuration file from a gateways line.
1254            """
1255
1256            sproject = tbparams[gw].get('project', 'project')
1257            dproject = tbparams[dtb].get('project', 'project')
1258            sdomain = ".%s.%s%s" % (eid, sproject,
1259                    tbparams[gw].get('domain', ".example.com"))
1260            ddomain = ".%s.%s%s" % (eid, dproject,
1261                    tbparams[dtb].get('domain', ".example.com"))
1262            boss = tbparams[master].get('boss', "boss")
1263            fs = tbparams[master].get('fs', "fs")
1264            event_server = "%s%s" % \
1265                    (tbparams[gw].get('eventserver', "event_server"),
1266                            tbparams[gw].get('domain', "example.com"))
1267            remote_event_server = "%s%s" % \
1268                    (tbparams[dtb].get('eventserver', "event_server"),
1269                            tbparams[dtb].get('domain', "example.com"))
1270            seer_control = "%s%s" % \
1271                    (tbparams[gw].get('control', "control"), sdomain)
1272            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1273
1274            if self.fedkit:
1275                remote_script_dir = "/usr/local/federation/bin"
1276                local_script_dir = "/usr/local/federation/bin"
1277            else:
1278                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1279                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1280
1281            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1282            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1283            tunnel_cfg = tbparams[gw].get("tun", "false")
1284
1285            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1286            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1287
1288            # translate to lower case so the `hostname` hack for specifying
1289            # configuration files works.
1290            conf_file = conf_file.lower();
1291            remote_conf_file = remote_conf_file.lower();
1292
1293            if dtb == master:
1294                active = "false"
1295            elif gw == master:
1296                active = "true"
1297            elif active_end.has_key('%s-%s' % (dtb, gw)):
1298                active = "false"
1299            else:
1300                active_end['%s-%s' % (gw, dtb)] = 1
1301                active = "true"
1302
1303            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1304            print >>gwconfig, "Active: %s" % active
1305            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1306            if tunnel_iface:
1307                print >>gwconfig, "Interface: %s" % tunnel_iface
1308            print >>gwconfig, "BossName: %s" % boss
1309            print >>gwconfig, "FsName: %s" % fs
1310            print >>gwconfig, "EventServerName: %s" % event_server
1311            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1312            print >>gwconfig, "SeerControl: %s" % seer_control
1313            print >>gwconfig, "Type: %s" % type
1314            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1315            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1316                    local_script_dir
1317            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1318            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1319            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1320                    (remote_conf_dir, remote_conf_file)
1321            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1322            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1323            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1324            gwconfig.close()
1325
1326            return active == "true"
1327
1328        def __call__(self, line, allocated, tbparams):
1329            # Process gateways
1330            if not self.current_gateways:
1331                m = self.begin_gateways.match(line)
1332                if m:
1333                    self.current_gateways = m.group(1)
1334                    if allocated.has_key(self.current_gateways):
1335                        # This test should always succeed
1336                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1337                        if not os.path.exists(tb_dir):
1338                            try:
1339                                os.mkdir(tb_dir)
1340                            except IOError:
1341                                raise service_error(service_error.internal,
1342                                        "Cannot create %s" % tb_dir)
1343                    else:
1344                        # XXX
1345                        self.log.error("[gateways]: Ignoring gateways for " + \
1346                                "unknown testbed %s" % self.current_gateways)
1347                        self.current_gateways = None
1348                    return True
1349                else:
1350                    return False
1351            else:
1352                m = self.end_gateways.match(line)
1353                if m :
1354                    if m.group(1) != self.current_gateways:
1355                        raise service_error(service_error.internal,
1356                                "Mismatched gateway markers!?")
1357                    if self.control_gateway:
1358                        try:
1359                            cc = open("%s/%s/client.conf" %
1360                                    (self.tmpdir, self.current_gateways), 'w')
1361                            print >>cc, "ControlGateway: %s" % \
1362                                    self.control_gateway
1363                            if tbparams[self.master].has_key('smbshare'):
1364                                print >>cc, "SMBSHare: %s" % \
1365                                        tbparams[self.master]['smbshare']
1366                            print >>cc, "ProjectUser: %s" % \
1367                                    tbparams[self.master]['user']
1368                            print >>cc, "ProjectName: %s" % \
1369                                    tbparams[self.master]['project']
1370                            print >>cc, "ExperimentID: %s/%s" % \
1371                                    ( tbparams[self.master]['project'], \
1372                                    self.eid )
1373                            cc.close()
1374                        except IOError:
1375                            raise service_error(service_error.internal,
1376                                    "Error creating client config")
1377                        # XXX: This seer specific file should disappear
1378                        try:
1379                            cc = open("%s/%s/seer.conf" %
1380                                    (self.tmpdir, self.current_gateways),
1381                                    'w')
1382                            if self.current_gateways != self.master:
1383                                print >>cc, "ControlNode: %s" % \
1384                                        self.control_gateway
1385                            print >>cc, "ExperimentID: %s/%s" % \
1386                                    ( tbparams[self.master]['project'], \
1387                                    self.eid )
1388                            cc.close()
1389                        except IOError:
1390                            raise service_error(service_error.internal,
1391                                    "Error creating seer config")
1392                    else:
1393                        debug.error("[gateways]: No control gateway for %s" %\
1394                                    self.current_gateways)
1395                    self.current_gateways = None
1396                else:
1397                    dtb, myname, desthost, type = line.split(" ")
1398
1399                    if type == "control" or type == "both":
1400                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1401                                self.eid, 
1402                                tbparams[self.current_gateways]['project'],
1403                                tbparams[self.current_gateways]['domain'])
1404                    try:
1405                        active = self.gateway_conf_file(self.current_gateways,
1406                                self.master, self.eid, self.gw_pubkey_base,
1407                                self.gw_secretkey_base,
1408                                self.active_end, tbparams, dtb, myname,
1409                                desthost, type)
1410                    except IOError, e:
1411                        raise service_error(service_error.internal,
1412                                "Failed to write config file for %s" % \
1413                                        self.current_gateway)
1414           
1415                    gw_pubkey = "%s/keys/%s" % \
1416                            (self.tmpdir, self.gw_pubkey_base)
1417                    gw_secretkey = "%s/keys/%s" % \
1418                            (self.tmpdir, self.gw_secretkey_base)
1419
1420                    pkfile = "%s/%s/%s" % \
1421                            ( self.tmpdir, self.current_gateways, 
1422                                    self.gw_pubkey_base)
1423                    skfile = "%s/%s/%s" % \
1424                            ( self.tmpdir, self.current_gateways, 
1425                                    self.gw_secretkey_base)
1426
1427                    if not os.path.exists(pkfile):
1428                        try:
1429                            self.copy_file(gw_pubkey, pkfile)
1430                        except IOError:
1431                            service_error(service_error.internal,
1432                                    "Failed to copy pubkey file")
1433
1434                    if active and not os.path.exists(skfile):
1435                        try:
1436                            self.copy_file(gw_secretkey, skfile)
1437                        except IOError:
1438                            service_error(service_error.internal,
1439                                    "Failed to copy secretkey file")
1440                return True
1441
1442    class shunt_to_file:
1443        """
1444        Simple class to write data between two regexps to a file.
1445        """
1446        def __init__(self, begin, end, filename):
1447            """
1448            Begin shunting on a match of begin, stop on end, send data to
1449            filename.
1450            """
1451            self.begin = re.compile(begin)
1452            self.end = re.compile(end)
1453            self.in_shunt = False
1454            self.file = None
1455            self.filename = filename
1456
1457        def __call__(self, line):
1458            """
1459            Call this on each line in the input that may be shunted.
1460            """
1461            if not self.in_shunt:
1462                if self.begin.match(line):
1463                    self.in_shunt = True
1464                    try:
1465                        self.file = open(self.filename, "w")
1466                    except:
1467                        self.file = None
1468                        raise
1469                    return True
1470                else:
1471                    return False
1472            else:
1473                if self.end.match(line):
1474                    if self.file: 
1475                        self.file.close()
1476                        self.file = None
1477                    self.in_shunt = False
1478                else:
1479                    if self.file:
1480                        print >>self.file, line
1481                return True
1482
1483    class shunt_to_list:
1484        """
1485        Same interface as shunt_to_file.  Data collected in self.list, one list
1486        element per line.
1487        """
1488        def __init__(self, begin, end):
1489            self.begin = re.compile(begin)
1490            self.end = re.compile(end)
1491            self.in_shunt = False
1492            self.list = [ ]
1493       
1494        def __call__(self, line):
1495            if not self.in_shunt:
1496                if self.begin.match(line):
1497                    self.in_shunt = True
1498                    return True
1499                else:
1500                    return False
1501            else:
1502                if self.end.match(line):
1503                    self.in_shunt = False
1504                else:
1505                    self.list.append(line)
1506                return True
1507
1508    class shunt_to_string:
1509        """
1510        Same interface as shunt_to_file.  Data collected in self.str, all in
1511        one string.
1512        """
1513        def __init__(self, begin, end):
1514            self.begin = re.compile(begin)
1515            self.end = re.compile(end)
1516            self.in_shunt = False
1517            self.str = ""
1518       
1519        def __call__(self, line):
1520            if not self.in_shunt:
1521                if self.begin.match(line):
1522                    self.in_shunt = True
1523                    return True
1524                else:
1525                    return False
1526            else:
1527                if self.end.match(line):
1528                    self.in_shunt = False
1529                else:
1530                    self.str += line
1531                return True
1532
1533    def create_experiment(self, req, fid):
1534        """
1535        The external interface to experiment creation called from the
1536        dispatcher.
1537
1538        Creates a working directory, splits the incoming description using the
1539        splitter script and parses out the avrious subsections using the
1540        lcasses above.  Once each sub-experiment is created, use pooled threads
1541        to instantiate them and start it all up.
1542        """
1543
1544        if not self.auth.check_attribute(fid, 'create'):
1545            raise service_error(service_error.access, "Create access denied")
1546
1547        try:
1548            tmpdir = tempfile.mkdtemp(prefix="split-")
1549        except IOError:
1550            raise service_error(service_error.internal, "Cannot create tmp dir")
1551
1552        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1553        gw_secretkey_base = "fed.%s" % self.ssh_type
1554        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1555        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1556        tclfile = tmpdir + "/experiment.tcl"
1557        tbparams = { }
1558        try:
1559            access_user = self.accessdb[fid]
1560        except KeyError:
1561            raise service_error(service_error.internal,
1562                    "Access map and authorizer out of sync in " + \
1563                            "create_experiment for fedid %s"  % fid)
1564
1565        pid = "dummy"
1566        gid = "dummy"
1567        # XXX
1568        fail_soft = False
1569
1570        try:
1571            os.mkdir(tmpdir+"/keys")
1572        except OSError:
1573            raise service_error(service_error.internal,
1574                    "Can't make temporary dir")
1575
1576        req = req.get('CreateRequestBody', None)
1577        if not req:
1578            raise service_error(service_error.req,
1579                    "Bad request format (no CreateRequestBody)")
1580        # The tcl parser needs to read a file so put the content into that file
1581        descr=req.get('experimentdescription', None)
1582        if descr:
1583            file_content=descr.get('ns2description', None)
1584            if file_content:
1585                try:
1586                    f = open(tclfile, 'w')
1587                    f.write(file_content)
1588                    f.close()
1589                except IOError:
1590                    raise service_error(service_error.internal,
1591                            "Cannot write temp experiment description")
1592            else:
1593                raise service_error(service_error.req, 
1594                        "Only ns2descriptions supported")
1595        else:
1596            raise service_error(service_error.req, "No experiment description")
1597
1598        if req.has_key('experimentID') and \
1599                req['experimentID'].has_key('localname'):
1600            eid = req['experimentID']['localname']
1601            self.state_lock.acquire()
1602            while (self.state.has_key(eid)):
1603                eid += random.choice(string.ascii_letters)
1604            # To avoid another thread picking this localname
1605            self.state[eid] = "placeholder"
1606            self.state_lock.release()
1607        else:
1608            eid = self.exp_stem
1609            for i in range(0,5):
1610                eid += random.choice(string.ascii_letters)
1611            self.state_lock.acquire()
1612            while (self.state.has_key(eid)):
1613                eid = self.exp_stem
1614                for i in range(0,5):
1615                    eid += random.choice(string.ascii_letters)
1616            # To avoid another thread picking this localname
1617            self.state[eid] = "placeholder"
1618            self.state_lock.release()
1619
1620        try: 
1621            # This catches exceptions to clear the placeholder if necessary
1622            try:
1623                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1624            except ValueError:
1625                raise service_error(service_error.server_config, 
1626                        "Bad key type (%s)" % self.ssh_type)
1627
1628            user = req.get('user', None)
1629            if user == None:
1630                raise service_error(service_error.req, "No user")
1631
1632            master = req.get('master', None)
1633            if not master:
1634                raise service_error(service_error.req,
1635                        "No master testbed label")
1636            export_project = req.get('exportProject', None)
1637            if not export_project:
1638                raise service_error(service_error.req, "No export project")
1639           
1640            if self.splitter_url:
1641                self.log.debug("Calling remote splitter at %s" % \
1642                        self.splitter_url)
1643                split_data = self.remote_splitter(self.splitter_url,
1644                        file_content, master)
1645            else:
1646                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1647                    str(self.muxmax), '-m', master]
1648
1649                if self.fedkit:
1650                    tclcmd.append('-k')
1651
1652                tclcmd.extend([pid, gid, eid, tclfile])
1653
1654                self.log.debug("running local splitter %s", " ".join(tclcmd))
1655                tclparser = Popen(tclcmd, stdout=PIPE)
1656                split_data = tclparser.stdout
1657
1658            allocated = { }         # Testbeds we can access
1659            started = { }           # Testbeds where a sub-experiment started
1660                                # successfully
1661
1662            # Objects to parse the splitter output (defined above)
1663            parse_current_testbed = self.current_testbed(eid, tmpdir,
1664                    self.fedkit)
1665            parse_allbeds = self.allbeds(self.get_access)
1666            parse_gateways = self.gateways(eid, master, tmpdir,
1667                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1668                    self.fedkit)
1669            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1670                        "^#\s+End\s+Vtopo")
1671            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1672                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1673            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1674                    "^#\s+End\s+tarfiles")
1675            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1676                    "^#\s+End\s+rpms")
1677
1678            # Working on the split data
1679            for line in split_data:
1680                line = line.rstrip()
1681                if parse_current_testbed(line, master, allocated, tbparams):
1682                    continue
1683                elif parse_allbeds(line, user, tbparams, master, export_project,
1684                        access_user):
1685                    continue
1686                elif parse_gateways(line, allocated, tbparams):
1687                    continue
1688                elif parse_vtopo(line):
1689                    continue
1690                elif parse_hostnames(line):
1691                    continue
1692                elif parse_tarfiles(line):
1693                    continue
1694                elif parse_rpms(line):
1695                    continue
1696                else:
1697                    raise service_error(service_error.internal, 
1698                            "Bad tcl parse? %s" % line)
1699            # Virtual topology and visualization
1700            vtopo = self.gentopo(parse_vtopo.str)
1701            if not vtopo:
1702                raise service_error(service_error.internal, 
1703                        "Failed to generate virtual topology")
1704
1705            vis = self.genviz(vtopo)
1706            if not vis:
1707                raise service_error(service_error.internal, 
1708                        "Failed to generate visualization")
1709           
1710            # save federant information
1711            for k in allocated.keys():
1712                tbparams[k]['federant'] = {\
1713                        'name': [ { 'localname' : eid} ],\
1714                        'emulab': tbparams[k]['emulab'],\
1715                        'allocID' : tbparams[k]['allocID'],\
1716                        'master' : k == master,\
1717                    }
1718
1719
1720            # Copy tarfiles and rpms needed at remote sites into a staging area
1721            try:
1722                if self.fedkit:
1723                    parse_tarfiles.list.append(self.fedkit)
1724                for t in parse_tarfiles.list:
1725                    if not os.path.exists("%s/tarfiles" % tmpdir):
1726                        os.mkdir("%s/tarfiles" % tmpdir)
1727                    self.copy_file(t, "%s/tarfiles/%s" % \
1728                            (tmpdir, os.path.basename(t)))
1729                for r in parse_rpms.list:
1730                    if not os.path.exists("%s/rpms" % tmpdir):
1731                        os.mkdir("%s/rpms" % tmpdir)
1732                    self.copy_file(r, "%s/rpms/%s" % \
1733                            (tmpdir, os.path.basename(r)))
1734            except IOError, e:
1735                raise service_error(service_error.internal, 
1736                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1737
1738        except service_error, e:
1739            # If something goes wrong in the parse (usually an access error)
1740            # clear the placeholder state.  From here on out the code delays
1741            # exceptions.
1742            self.state_lock.acquire()
1743            del self.state[eid]
1744            self.state_lock.release()
1745            raise e
1746
1747        thread_pool = self.thread_pool(self.nthreads)
1748        threads = [ ]
1749
1750        for tb in [ k for k in allocated.keys() if k != master]:
1751            # Create and start a thread to start the segment, and save it to
1752            # get the return value later
1753            thread_pool.wait_for_slot()
1754            t  = self.pooled_thread(target=self.start_segment, 
1755                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1756                    pdata=thread_pool, trace_file=self.trace_file)
1757            threads.append(t)
1758            t.start()
1759
1760        # Wait until all finish
1761        thread_pool.wait_for_all_done()
1762
1763        # If none failed, start the master
1764        failed = [ t.getName() for t in threads if not t.rv ]
1765
1766        if len(failed) == 0:
1767            if not self.start_segment(master, eid, tbparams, tmpdir):
1768                failed.append(master)
1769
1770        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1771        # If one failed clean up, unless fail_soft is set
1772        if failed:
1773            if not fail_soft:
1774                thread_pool.clear()
1775                for tb in succeeded:
1776                    # Create and start a thread to stop the segment
1777                    thread_pool.wait_for_slot()
1778                    t  = self.pooled_thread(target=self.stop_segment, 
1779                            args=(tb, eid, tbparams), name=tb,
1780                            pdata=thread_pool, trace_file=self.trace_file)
1781                    t.start()
1782                # Wait until all finish
1783                thread_pool.wait_for_all_done()
1784
1785                # release the allocations
1786                for tb in tbparams.keys():
1787                    self.release_access(tb, tbparams[tb]['allocID'])
1788                # Remove the placeholder
1789                self.state_lock.acquire()
1790                del self.state[eid]
1791                self.state_lock.release()
1792
1793                raise service_error(service_error.federant,
1794                    "Swap in failed on %s" % ",".join(failed))
1795        else:
1796            self.log.info("[start_segment]: Experiment %s started" % eid)
1797
1798        # Generate an ID for the experiment (slice) and a certificate that the
1799        # allocator can use to prove they own it.  We'll ship it back through
1800        # the encrypted connection.
1801        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1802
1803        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1804
1805        # Walk up tmpdir, deleting as we go
1806        for path, dirs, files in os.walk(tmpdir, topdown=False):
1807            for f in files:
1808                os.remove(os.path.join(path, f))
1809            for d in dirs:
1810                os.rmdir(os.path.join(path, d))
1811        os.rmdir(tmpdir)
1812
1813        # The deepcopy prevents the allocation ID and other binaries from being
1814        # translated into other formats
1815        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1816                for tb in tbparams.keys() \
1817                    if tbparams[tb].has_key('federant') ],\
1818                    'vtopo': vtopo,\
1819                    'vis' : vis,
1820                    'experimentID' : [\
1821                            { 'fedid': copy.copy(expid) }, \
1822                            { 'localname': eid },\
1823                        ],\
1824                    'experimentAccess': { 'X509' : expcert },\
1825                }
1826        # remove the allocationID info from each federant
1827        for f in resp['federant']:
1828            if f.has_key('allocID'): del f['allocID']
1829
1830        # Insert the experiment into our state and update the disk copy
1831        self.state_lock.acquire()
1832        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1833                for tb in tbparams.keys() \
1834                    if tbparams[tb].has_key('federant') ],\
1835                    'vtopo': vtopo,\
1836                    'vis' : vis,
1837                    'owner': fid,
1838                    'experimentID' : [\
1839                            { 'fedid': expid }, { 'localname': eid },\
1840                        ],\
1841                }
1842        self.state[eid] = self.state[expid]
1843        if self.state_filename: self.write_state()
1844        self.state_lock.release()
1845
1846        self.auth.set_attribute(fid, expid)
1847        self.auth.set_attribute(expid, expid)
1848
1849        if not failed:
1850            return resp
1851        else:
1852            raise service_error(service_error.partial, \
1853                    "Partial swap in on %s" % ",".join(succeeded))
1854
1855    def check_experiment_access(self, fid, key):
1856        """
1857        Confirm that the fid has access to the experiment.  Though a request
1858        may be made in terms of a local name, the access attribute is always
1859        the experiment's fedid.
1860        """
1861        if not isinstance(key, fedid):
1862            self.state_lock.acquire()
1863            if self.state.has_key(key):
1864                try:
1865                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1866                            if f.has_key('fedid') ]
1867                except KeyError:
1868                    self.state_lock.release()
1869                    raise service_error(service_error.internal, 
1870                            "No fedid for experiment %s when checking " +\
1871                                    "access(!?)" % key)
1872                if len(kl) == 1:
1873                    key = kl[0]
1874                else:
1875                    self.state_lock.release()
1876                    raise service_error(service_error.internal, 
1877                            "multiple fedids for experiment %s when " +\
1878                                    "checking access(!?)" % key)
1879            else:
1880                self.state_lock.release()
1881                raise service_error(service_error.access, "Access Denied")
1882            self.state_lock.release()
1883
1884        if self.auth.check_attribute(fid, key):
1885            return True
1886        else:
1887            raise service_error(service_error.access, "Access Denied")
1888
1889
1890
1891    def get_vtopo(self, req, fid):
1892        """
1893        Return the stored virtual topology for this experiment
1894        """
1895        rv = None
1896
1897        req = req.get('VtopoRequestBody', None)
1898        if not req:
1899            raise service_error(service_error.req,
1900                    "Bad request format (no VtopoRequestBody)")
1901        exp = req.get('experiment', None)
1902        if exp:
1903            if exp.has_key('fedid'):
1904                key = exp['fedid']
1905                keytype = "fedid"
1906            elif exp.has_key('localname'):
1907                key = exp['localname']
1908                keytype = "localname"
1909            else:
1910                raise service_error(service_error.req, "Unknown lookup type")
1911        else:
1912            raise service_error(service_error.req, "No request?")
1913
1914        self.check_experiment_access(fid, key)
1915
1916        self.state_lock.acquire()
1917        if self.state.has_key(key):
1918            rv = { 'experiment' : {keytype: key },\
1919                    'vtopo': self.state[key]['vtopo'],\
1920                }
1921        self.state_lock.release()
1922
1923        if rv: return rv
1924        else: raise service_error(service_error.req, "No such experiment")
1925
1926    def get_vis(self, req, fid):
1927        """
1928        Return the stored visualization for this experiment
1929        """
1930        rv = None
1931
1932        req = req.get('VisRequestBody', None)
1933        if not req:
1934            raise service_error(service_error.req,
1935                    "Bad request format (no VisRequestBody)")
1936        exp = req.get('experiment', None)
1937        if exp:
1938            if exp.has_key('fedid'):
1939                key = exp['fedid']
1940                keytype = "fedid"
1941            elif exp.has_key('localname'):
1942                key = exp['localname']
1943                keytype = "localname"
1944            else:
1945                raise service_error(service_error.req, "Unknown lookup type")
1946        else:
1947            raise service_error(service_error.req, "No request?")
1948
1949        self.check_experiment_access(fid, key)
1950
1951        self.state_lock.acquire()
1952        if self.state.has_key(key):
1953            rv =  { 'experiment' : {keytype: key },\
1954                    'vis': self.state[key]['vis'],\
1955                    }
1956        self.state_lock.release()
1957
1958        if rv: return rv
1959        else: raise service_error(service_error.req, "No such experiment")
1960
1961    def get_info(self, req, fid):
1962        """
1963        Return all the stored info about this experiment
1964        """
1965        rv = None
1966
1967        req = req.get('InfoRequestBody', None)
1968        if not req:
1969            raise service_error(service_error.req,
1970                    "Bad request format (no VisRequestBody)")
1971        exp = req.get('experiment', None)
1972        if exp:
1973            if exp.has_key('fedid'):
1974                key = exp['fedid']
1975                keytype = "fedid"
1976            elif exp.has_key('localname'):
1977                key = exp['localname']
1978                keytype = "localname"
1979            else:
1980                raise service_error(service_error.req, "Unknown lookup type")
1981        else:
1982            raise service_error(service_error.req, "No request?")
1983
1984        self.check_experiment_access(fid, key)
1985
1986        # The state may be massaged by the service function that called
1987        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1988        # state.
1989        self.state_lock.acquire()
1990        if self.state.has_key(key):
1991            rv = copy.deepcopy(self.state[key])
1992        self.state_lock.release()
1993        # Remove the owner info
1994        del rv['owner']
1995        # remove the allocationID info from each federant
1996        for f in rv['federant']:
1997            if f.has_key('allocID'): del f['allocID']
1998
1999        if rv: return rv
2000        else: raise service_error(service_error.req, "No such experiment")
2001
2002
2003    def terminate_experiment(self, req, fid):
2004        """
2005        Swap this experiment out on the federants and delete the shared
2006        information
2007        """
2008        tbparams = { }
2009        req = req.get('TerminateRequestBody', None)
2010        if not req:
2011            raise service_error(service_error.req,
2012                    "Bad request format (no TerminateRequestBody)")
2013        exp = req.get('experiment', None)
2014        if exp:
2015            if exp.has_key('fedid'):
2016                key = exp['fedid']
2017                keytype = "fedid"
2018            elif exp.has_key('localname'):
2019                key = exp['localname']
2020                keytype = "localname"
2021            else:
2022                raise service_error(service_error.req, "Unknown lookup type")
2023        else:
2024            raise service_error(service_error.req, "No request?")
2025
2026        self.check_experiment_access(fid, key)
2027
2028        self.state_lock.acquire()
2029        fed_exp = self.state.get(key, None)
2030
2031        if fed_exp:
2032            # This branch of the conditional holds the lock to generate a
2033            # consistent temporary tbparams variable to deallocate experiments.
2034            # It releases the lock to do the deallocations and reacquires it to
2035            # remove the experiment state when the termination is complete.
2036            ids = []
2037            #  experimentID is a list of dicts that are self-describing
2038            #  identifiers.  This finds all the fedids and localnames - the
2039            #  keys of self.state - and puts them into ids.
2040            for id in fed_exp.get('experimentID', []):
2041                if id.has_key('fedid'): ids.append(id['fedid'])
2042                if id.has_key('localname'): ids.append(id['localname'])
2043
2044            # Construct enough of the tbparams to make the stop_segment calls
2045            # work
2046            for fed in fed_exp['federant']:
2047                try:
2048                    for e in fed['name']:
2049                        eid = e.get('localname', None)
2050                        if eid: break
2051                    else:
2052                        continue
2053
2054                    p = fed['emulab']['project']
2055
2056                    project = p['name']['localname']
2057                    tb = p['testbed']['localname']
2058                    user = p['user'][0]['userID']['localname']
2059
2060                    domain = fed['emulab']['domain']
2061                    host  = fed['emulab']['ops']
2062                    aid = fed['allocID']
2063                except KeyError, e:
2064                    continue
2065                tbparams[tb] = {\
2066                        'user': user,\
2067                        'domain': domain,\
2068                        'project': project,\
2069                        'host': host,\
2070                        'eid': eid,\
2071                        'aid': aid,\
2072                    }
2073            self.state_lock.release()
2074
2075            # Stop everyone.
2076            thread_pool = self.thread_pool(self.nthreads)
2077            for tb in tbparams.keys():
2078                # Create and start a thread to stop the segment
2079                thread_pool.wait_for_slot()
2080                t  = self.pooled_thread(target=self.stop_segment, 
2081                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2082                        pdata=thread_pool, trace_file=self.trace_file)
2083                t.start()
2084            # Wait for completions
2085            thread_pool.wait_for_all_done()
2086
2087            # release the allocations
2088            for tb in tbparams.keys():
2089                self.release_access(tb, tbparams[tb]['aid'])
2090
2091            # Remove the terminated experiment
2092            self.state_lock.acquire()
2093            for id in ids:
2094                if self.state.has_key(id): del self.state[id]
2095
2096            if self.state_filename: self.write_state()
2097            self.state_lock.release()
2098
2099            return { 'experiment': exp }
2100        else:
2101            # Don't forget to release the lock
2102            self.state_lock.release()
2103            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.