source: fedd/federation/experiment_control.py @ 222290f

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 222290f was 222290f, checked in by Ted Faber <faber@…>, 15 years ago

Add gatewaykit, which allows the testbed admins to put other software on
gateway nodes - for example, seer - that's put on experiment nodes
automatically. This mechanism may not be the final one to distribute such
code.

  • Property mode set to 100644
File size: 64.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self, nthreads):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53            self.nthreads = nthreads
54
55        def acquire(self):
56            """
57            Get the pool's lock.
58            """
59            self.changed.acquire()
60
61        def release(self):
62            """
63            Release the pool's lock.
64            """
65            self.changed.release()
66
67        def wait(self, timeout = None):
68            """
69            Wait for a pool thread to start or stop.
70            """
71            self.changed.wait(timeout)
72
73        def start(self):
74            """
75            Called by a pool thread to report starting.
76            """
77            self.changed.acquire()
78            self.started += 1
79            self.changed.notifyAll()
80            self.changed.release()
81
82        def terminate(self):
83            """
84            Called by a pool thread to report finishing.
85            """
86            self.changed.acquire()
87            self.terminated += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def clear(self):
92            """
93            Clear all pool data.
94            """
95            self.changed.acquire()
96            self.started = 0
97            self.terminated =0
98            self.changed.notifyAll()
99            self.changed.release()
100
101        def wait_for_slot(self):
102            """
103            Wait until we have a free slot to start another pooled thread
104            """
105            self.acquire()
106            while self.started - self.terminated >= self.nthreads:
107                self.wait()
108            self.release()
109
110        def wait_for_all_done(self):
111            """
112            Wait until all active threads finish (and at least one has started)
113            """
114            self.acquire()
115            while self.started == 0 or self.started > self.terminated:
116                self.wait()
117            self.release()
118
119    class pooled_thread(Thread):
120        """
121        One of a set of threads dedicated to a specific task.  Uses the
122        thread_pool class above for coordination.
123        """
124        def __init__(self, group=None, target=None, name=None, args=(), 
125                kwargs={}, pdata=None, trace_file=None):
126            Thread.__init__(self, group, target, name, args, kwargs)
127            self.rv = None          # Return value of the ops in this thread
128            self.exception = None   # Exception that terminated this thread
129            self.target=target      # Target function to run on start()
130            self.args = args        # Args to pass to target
131            self.kwargs = kwargs    # Additional kw args
132            self.pdata = pdata      # thread_pool for this class
133            # Logger for this thread
134            self.log = logging.getLogger("fedd.experiment_control")
135       
136        def run(self):
137            """
138            Emulate Thread.run, except add pool data manipulation and error
139            logging.
140            """
141            if self.pdata:
142                self.pdata.start()
143
144            if self.target:
145                try:
146                    self.rv = self.target(*self.args, **self.kwargs)
147                except service_error, s:
148                    self.exception = s
149                    self.log.error("Thread exception: %s %s" % \
150                            (s.code_string(), s.desc))
151                except:
152                    self.exception = sys.exc_info()[1]
153                    self.log.error(("Unexpected thread exception: %s" +\
154                            "Trace %s") % (self.exception,\
155                                traceback.format_exc()))
156            if self.pdata:
157                self.pdata.terminate()
158
159    call_RequestAccess = service_caller('RequestAccess')
160    call_ReleaseAccess = service_caller('ReleaseAccess')
161    call_Ns2Split = service_caller('Ns2Split')
162
163    def __init__(self, config=None, auth=None):
164        """
165        Intialize the various attributes, most from the config object
166        """
167        self.thread_with_rv = experiment_control_local.pooled_thread
168        self.thread_pool = experiment_control_local.thread_pool
169
170        self.cert_file = config.get("experiment_control", "cert_file")
171        if self.cert_file:
172            self.cert_pwd = config.get("experiment_control", "cert_pwd")
173        else:
174            self.cert_file = config.get("globals", "cert_file")
175            self.cert_pwd = config.get("globals", "cert_pwd")
176
177        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
178                or config.get("globals", "trusted_certs")
179
180        self.exp_stem = "fed-stem"
181        self.log = logging.getLogger("fedd.experiment_control")
182        set_log_level(config, "experiment_control", self.log)
183        self.muxmax = 2
184        self.nthreads = 2
185        self.randomize_experiments = False
186
187        self.scp_exec = "/usr/bin/scp"
188        self.splitter = None
189        self.ssh_exec="/usr/bin/ssh"
190        self.ssh_keygen = "/usr/bin/ssh-keygen"
191        self.ssh_identity_file = None
192
193
194        self.debug = config.getboolean("experiment_control", "create_debug")
195        self.state_filename = config.get("experiment_control", 
196                "experiment_state")
197        self.splitter_url = config.get("experiment_control", "splitter_uri")
198        self.fedkit = config.get("experiment_control", "fedkit")
199        self.gatewaykit = config.get("experiment_control", "gatewaykit")
200        accessdb_file = config.get("experiment_control", "accessdb")
201
202        self.ssh_pubkey_file = config.get("experiment_control", 
203                "ssh_pubkey_file")
204        self.ssh_privkey_file = config.get("experiment_control",
205                "ssh_privkey_file")
206        # NB for internal master/slave ops, not experiment setup
207        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
208        self.state = { }
209        self.state_lock = Lock()
210        self.tclsh = "/usr/local/bin/otclsh"
211        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
212                config.get("experiment_control", "tcl_splitter",
213                        "/usr/testbed/lib/ns2ir/parse.tcl")
214        mapdb_file = config.get("experiment_control", "mapdb")
215        self.trace_file = sys.stderr
216
217        self.def_expstart = \
218                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
219                "/tmp/federate";
220        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
221                "FEDDIR/hosts";
222        self.def_gwstart = \
223                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
224                "/tmp/bridge.log";
225        self.def_mgwstart = \
226                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
227                "/tmp/bridge.log";
228        self.def_gwimage = "FBSD61-TUNNEL2";
229        self.def_gwtype = "pc";
230        self.local_access = { }
231
232        if auth:
233            self.auth = auth
234        else:
235            self.log.error(\
236                    "[access]: No authorizer initialized, creating local one.")
237            auth = authorizer()
238
239
240        if self.ssh_pubkey_file:
241            try:
242                f = open(self.ssh_pubkey_file, 'r')
243                self.ssh_pubkey = f.read()
244                f.close()
245            except IOError:
246                raise service_error(service_error.internal,
247                        "Cannot read sshpubkey")
248        else:
249            raise service_error(service_error.internal, 
250                    "No SSH public key file?")
251
252        if not self.ssh_privkey_file:
253            raise service_error(service_error.internal, 
254                    "No SSH public key file?")
255
256
257        if mapdb_file:
258            self.read_mapdb(mapdb_file)
259        else:
260            self.log.warn("[experiment_control] No testbed map, using defaults")
261            self.tbmap = { 
262                    'deter':'https://users.isi.deterlab.net:23235',
263                    'emulab':'https://users.isi.deterlab.net:23236',
264                    'ucb':'https://users.isi.deterlab.net:23237',
265                    }
266
267        if accessdb_file:
268                self.read_accessdb(accessdb_file)
269        else:
270            raise service_error(service_error.internal,
271                    "No accessdb specified in config")
272
273        # Grab saved state.  OK to do this w/o locking because it's read only
274        # and only one thread should be in existence that can see self.state at
275        # this point.
276        if self.state_filename:
277            self.read_state()
278
279        # Dispatch tables
280        self.soap_services = {\
281                'Create': soap_handler('Create', self.create_experiment),
282                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
283                'Vis': soap_handler('Vis', self.get_vis),
284                'Info': soap_handler('Info', self.get_info),
285                'Terminate': soap_handler('Terminate', 
286                    self.terminate_experiment),
287        }
288
289        self.xmlrpc_services = {\
290                'Create': xmlrpc_handler('Create', self.create_experiment),
291                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
292                'Vis': xmlrpc_handler('Vis', self.get_vis),
293                'Info': xmlrpc_handler('Info', self.get_info),
294                'Terminate': xmlrpc_handler('Terminate',
295                    self.terminate_experiment),
296        }
297
298    def copy_file(self, src, dest, size=1024):
299        """
300        Exceedingly simple file copy.
301        """
302        s = open(src,'r')
303        d = open(dest, 'w')
304
305        buf = "x"
306        while buf != "":
307            buf = s.read(size)
308            d.write(buf)
309        s.close()
310        d.close()
311
312    # Call while holding self.state_lock
313    def write_state(self):
314        """
315        Write a new copy of experiment state after copying the existing state
316        to a backup.
317
318        State format is a simple pickling of the state dictionary.
319        """
320        if os.access(self.state_filename, os.W_OK):
321            self.copy_file(self.state_filename, \
322                    "%s.bak" % self.state_filename)
323        try:
324            f = open(self.state_filename, 'w')
325            pickle.dump(self.state, f)
326        except IOError, e:
327            self.log.error("Can't write file %s: %s" % \
328                    (self.state_filename, e))
329        except pickle.PicklingError, e:
330            self.log.error("Pickling problem: %s" % e)
331        except TypeError, e:
332            self.log.error("Pickling problem (TypeError): %s" % e)
333
334    # Call while holding self.state_lock
335    def read_state(self):
336        """
337        Read a new copy of experiment state.  Old state is overwritten.
338
339        State format is a simple pickling of the state dictionary.
340        """
341        try:
342            f = open(self.state_filename, "r")
343            self.state = pickle.load(f)
344            self.log.debug("[read_state]: Read state from %s" % \
345                    self.state_filename)
346        except IOError, e:
347            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
348                    % (self.state_filename, e))
349        except pickle.UnpicklingError, e:
350            self.log.warning(("[read_state]: No saved state: " + \
351                    "Unpickling failed: %s") % e)
352       
353        for k in self.state.keys():
354            try:
355                # This list should only have one element in it, but phrasing it
356                # as a for loop doesn't cost much, really.  We have to find the
357                # fedid elements anyway.
358                for eid in [ f['fedid'] \
359                        for f in self.state[k]['experimentID']\
360                            if f.has_key('fedid') ]:
361                    self.auth.set_attribute(self.state[k]['owner'], eid)
362            except KeyError, e:
363                self.log.warning("[read_state]: State ownership or identity " +\
364                        "misformatted in %s: %s" % (self.state_filename, e))
365
366
367    def read_accessdb(self, accessdb_file):
368        """
369        Read the mapping from fedids that can create experiments to their name
370        in the 3-level access namespace.  All will be asserted from this
371        testbed and can include the local username and porject that will be
372        asserted on their behalf by this fedd.  Each fedid is also added to the
373        authorization system with the "create" attribute.
374        """
375        self.accessdb = {}
376        # These are the regexps for parsing the db
377        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
378        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
379                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
380        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
381                "\s*->\s*(" + name_expr + ")\s*$")
382        lineno = 0
383
384        # Parse the mappings and store in self.authdb, a dict of
385        # fedid -> (proj, user)
386        try:
387            f = open(accessdb_file, "r")
388            for line in f:
389                lineno += 1
390                line = line.strip()
391                if len(line) == 0 or line.startswith('#'):
392                    continue
393                m = project_line.match(line)
394                if m:
395                    fid = fedid(hexstr=m.group(1))
396                    project, user = m.group(2,3)
397                    if not self.accessdb.has_key(fid):
398                        self.accessdb[fid] = []
399                    self.accessdb[fid].append((project, user))
400                    continue
401
402                m = user_line.match(line)
403                if m:
404                    fid = fedid(hexstr=m.group(1))
405                    project = None
406                    user = m.group(2)
407                    if not self.accessdb.has_key(fid):
408                        self.accessdb[fid] = []
409                    self.accessdb[fid].append((project, user))
410                    continue
411                self.log.warn("[experiment_control] Error parsing access " +\
412                        "db %s at line %d" %  (accessdb_file, lineno))
413        except IOError:
414            raise service_error(service_error.internal,
415                    "Error opening/reading %s as experiment " +\
416                            "control accessdb" %  accessdb_file)
417        f.close()
418
419        # Initialize the authorization attributes
420        for fid in self.accessdb.keys():
421            self.auth.set_attribute(fid, 'create')
422
423    def read_mapdb(self, file):
424        """
425        Read a simple colon separated list of mappings for the
426        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
427        """
428
429        self.tbmap = { }
430        lineno =0
431        try:
432            f = open(file, "r")
433            for line in f:
434                lineno += 1
435                line = line.strip()
436                if line.startswith('#') or len(line) == 0:
437                    continue
438                try:
439                    label, url = line.split(':', 1)
440                    self.tbmap[label] = url
441                except ValueError, e:
442                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
443                            "map db: %s %s" % (lineno, line, e))
444        except IOError, e:
445            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
446                    "open %s: %s" % (file, e))
447        f.close()
448
449    def scp_file(self, file, user, host, dest=""):
450        """
451        scp a file to the remote host.  If debug is set the action is only
452        logged.
453        """
454
455        scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
456                '-o', 'StrictHostKeyChecking yes', '-i', 
457                self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)]
458        rv = 0
459
460        try:
461            dnull = open("/dev/null", "w")
462        except IOError:
463            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
464            dnull = Null
465
466        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
467        if not self.debug:
468            rv = call(scp_cmd, stdout=dnull, stderr=dnull)
469
470        return rv == 0
471
472    def ssh_cmd(self, user, host, cmd, wname=None):
473        """
474        Run a remote command on host as user.  If debug is set, the action is
475        only logged.
476        """
477        sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \
478                "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
479                (self.ssh_exec, self.ssh_privkey_file, 
480                        user, host, cmd)
481
482        try:
483            dnull = open("/dev/null", "r")
484        except IOError:
485            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
486            dnull = Null
487
488        self.log.debug("[ssh_cmd]: %s" % sh_str)
489        if not self.debug:
490            if dnull:
491                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
492            else:
493                sub = Popen(sh_str, shell=True)
494            return sub.wait() == 0
495        else:
496            return True
497
498    def ship_configs(self, host, user, src_dir, dest_dir):
499        """
500        Copy federant-specific configuration files to the federant.
501        """
502        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
503            return False
504        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
505            return False
506
507        for f in os.listdir(src_dir):
508            if os.path.isdir(f):
509                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
510                        "%s/%s" % (dest_dir, f)):
511                    return False
512            else:
513                if not self.scp_file("%s/%s" % (src_dir, f), 
514                        user, host, dest_dir):
515                    return False
516        return True
517
518    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
519        """
520        Start a sub-experiment on a federant.
521
522        Get the current state, modify or create as appropriate, ship data and
523        configs and start the experiment.  There are small ordering differences
524        based on the initial state of the sub-experiment.
525        """
526        # ops node in the federant
527        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
528        user = tbparams[tb]['user']     # federant user
529        pid = tbparams[tb]['project']   # federant project
530        # XXX
531        base_confs = ( "hosts",)
532        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
533        # command to test experiment state
534        expinfo_exec = "/usr/testbed/bin/expinfo" 
535        # Configuration directories on the remote machine
536        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
537        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
538        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
539        # Regular expressions to parse the expinfo response
540        state_re = re.compile("State:\s+(\w+)")
541        no_exp_re = re.compile("^No\s+such\s+experiment")
542        state = None    # Experiment state parsed from expinfo
543        # The expinfo ssh command.  Note the identity restriction to use only
544        # the identity provided in the pubkey given.
545        cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
546                'StrictHostKeyChecking yes', '-i', 
547                self.ssh_privkey_file, "%s@%s" % (user, host), 
548                expinfo_exec, pid, eid]
549
550        # Get status
551        self.log.debug("[start_segment]: %s"% " ".join(cmd))
552        dev_null = None
553        try:
554            dev_null = open("/dev/null", "a")
555        except IOError, e:
556            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
557
558        if self.debug:
559            state = 'swapped'
560            rv = 0
561        else:
562            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
563            for line in status.stdout:
564                m = state_re.match(line)
565                if m: state = m.group(1)
566                else:
567                    m = no_exp_re.match(line)
568                    if m: state = "none"
569            rv = status.wait()
570
571        # If the experiment is not present the subcommand returns a non-zero
572        # return value.  If we successfully parsed a "none" outcome, ignore the
573        # return code.
574        if rv != 0 and state != "none":
575            raise service_error(service_error.internal,
576                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
577
578        self.log.debug("[start_segment]: %s: %s" % (tb, state))
579        self.log.info("[start_segment]:transferring experiment to %s" % tb)
580
581        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
582            return False
583        # Clear the federation config dirs
584        if not self.ssh_cmd(user, host, 
585                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
586            return False
587        # Clear and create the tarfiles and rpm directories
588        for d in (tarfiles_dir, rpms_dir):
589            if not self.ssh_cmd(user, host, 
590                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
591                return False
592            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
593                    "create tarfiles"):
594                return False
595       
596        if state == 'active':
597            # Create the federation config dirs (do not move outside the
598            # conditional.  Happens later in new expriment creation)
599            if not self.ssh_cmd(user, host, 
600                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
601                return False
602            # Remote experiment is active.  Modify it.
603            for f in base_confs:
604                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
605                        "%s/%s" % (proj_dir, f)):
606                    return False
607            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
608                    proj_dir):
609                return False
610            if os.path.isdir("%s/tarfiles" % tmpdir):
611                if not self.ship_configs(host, user,
612                        "%s/tarfiles" % tmpdir, tarfiles_dir):
613                    return False
614            if os.path.isdir("%s/rpms" % tmpdir):
615                if not self.ship_configs(host, user,
616                        "%s/rpms" % tmpdir, tarfiles_dir):
617                    return False
618            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
619            if not self.ssh_cmd(user, host,
620                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
621                            (pid, eid, tclfile), "modexp"):
622                return False
623            return True
624        elif state == "swapped":
625            # Create the federation config dirs (do not move outside the
626            # conditional.  Happens later in new expriment creation)
627            if not self.ssh_cmd(user, host, 
628                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
629                return False
630            # Remote experiment swapped out.  Modify it and swap it in.
631            for f in base_confs:
632                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
633                        "%s/%s" % (proj_dir, f)):
634                    return False
635            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
636                    proj_dir):
637                return False
638            if os.path.isdir("%s/tarfiles" % tmpdir):
639                if not self.ship_configs(host, user,
640                        "%s/tarfiles" % tmpdir, tarfiles_dir):
641                    return False
642            if os.path.isdir("%s/rpms" % tmpdir):
643                if not self.ship_configs(host, user,
644                        "%s/rpms" % tmpdir, tarfiles_dir):
645                    return False
646            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
647            if not self.ssh_cmd(user, host,
648                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
649                    "modexp"):
650                return False
651            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
652            if not self.ssh_cmd(user, host,
653                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
654                    "swapexp"):
655                return False
656            return True
657        elif state == "none":
658            # No remote experiment.  Create one.  We do this in 2 steps so we
659            # can put the configuration files and scripts into the new
660            # experiment directories.
661
662            # Tarfiles must be present for creation to work
663            if os.path.isdir("%s/tarfiles" % tmpdir):
664                if not self.ship_configs(host, user,
665                        "%s/tarfiles" % tmpdir, tarfiles_dir):
666                    return False
667            if os.path.isdir("%s/rpms" % tmpdir):
668                if not self.ship_configs(host, user,
669                        "%s/rpms" % tmpdir, tarfiles_dir):
670                    return False
671            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
672            if not self.ssh_cmd(user, host,
673                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
674                            (pid, eid, tclfile), "startexp"):
675                return False
676            # Create the federation config dirs (do not move outside the
677            # conditional.)
678            if not self.ssh_cmd(user, host, 
679                    "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
680                return False
681            # After startexp the per-experiment directories exist
682            for f in base_confs:
683                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
684                        "%s/%s" % (proj_dir, f)):
685                    return False
686            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
687                    proj_dir):
688                return False
689            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
690            if not self.ssh_cmd(user, host,
691                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
692                    "swapexp"):
693                return False
694            return True
695        else:
696            self.log.debug("[start_segment]:unknown state %s" % state)
697            return False
698
699    def stop_segment(self, tb, eid, tbparams):
700        """
701        Stop a sub experiment by calling swapexp on the federant
702        """
703        user = tbparams[tb]['user']
704        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
705        pid = tbparams[tb]['project']
706
707        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
708        return self.ssh_cmd(user, host,
709                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
710
711       
712    def generate_ssh_keys(self, dest, type="rsa" ):
713        """
714        Generate a set of keys for the gateways to use to talk.
715
716        Keys are of type type and are stored in the required dest file.
717        """
718        valid_types = ("rsa", "dsa")
719        t = type.lower();
720        if t not in valid_types: raise ValueError
721        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
722
723        try:
724            trace = open("/dev/null", "w")
725        except IOError:
726            raise service_error(service_error.internal,
727                    "Cannot open /dev/null??");
728
729        # May raise CalledProcessError
730        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
731        rv = call(cmd, stdout=trace, stderr=trace)
732        if rv != 0:
733            raise service_error(service_error.internal, 
734                    "Cannot generate nonce ssh keys.  %s return code %d" \
735                            % (self.ssh_keygen, rv))
736
737    def gentopo(self, str):
738        """
739        Generate the topology dtat structure from the splitter's XML
740        representation of it.
741
742        The topology XML looks like:
743            <experiment>
744                <nodes>
745                    <node><vname></vname><ips>ip1:ip2</ips></node>
746                </nodes>
747                <lans>
748                    <lan>
749                        <vname></vname><vnode></vnode><ip></ip>
750                        <bandwidth></bandwidth><member>node:port</member>
751                    </lan>
752                </lans>
753        """
754        class topo_parse:
755            """
756            Parse the topology XML and create the dats structure.
757            """
758            def __init__(self):
759                # Typing of the subelements for data conversion
760                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
761                self.int_subelements = ( 'bandwidth',)
762                self.float_subelements = ( 'delay',)
763                # The final data structure
764                self.nodes = [ ]
765                self.lans =  [ ]
766                self.topo = { \
767                        'node': self.nodes,\
768                        'lan' : self.lans,\
769                    }
770                self.element = { }  # Current element being created
771                self.chars = ""     # Last text seen
772
773            def end_element(self, name):
774                # After each sub element the contents is added to the current
775                # element or to the appropriate list.
776                if name == 'node':
777                    self.nodes.append(self.element)
778                    self.element = { }
779                elif name == 'lan':
780                    self.lans.append(self.element)
781                    self.element = { }
782                elif name in self.str_subelements:
783                    self.element[name] = self.chars
784                    self.chars = ""
785                elif name in self.int_subelements:
786                    self.element[name] = int(self.chars)
787                    self.chars = ""
788                elif name in self.float_subelements:
789                    self.element[name] = float(self.chars)
790                    self.chars = ""
791
792            def found_chars(self, data):
793                self.chars += data.rstrip()
794
795
796        tp = topo_parse();
797        parser = xml.parsers.expat.ParserCreate()
798        parser.EndElementHandler = tp.end_element
799        parser.CharacterDataHandler = tp.found_chars
800
801        parser.Parse(str)
802
803        return tp.topo
804       
805
806    def genviz(self, topo):
807        """
808        Generate the visualization the virtual topology
809        """
810
811        neato = "/usr/local/bin/neato"
812        # These are used to parse neato output and to create the visualization
813        # file.
814        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
815        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
816                "%s</type></node>"
817
818        try:
819            # Node names
820            nodes = [ n['vname'] for n in topo['node'] ]
821            topo_lans = topo['lan']
822        except KeyError:
823            raise service_error(service_error.internal, "Bad topology")
824
825        lans = { }
826        links = { }
827
828        # Walk through the virtual topology, organizing the connections into
829        # 2-node connections (links) and more-than-2-node connections (lans).
830        # When a lan is created, it's added to the list of nodes (there's a
831        # node in the visualization for the lan).
832        for l in topo_lans:
833            if links.has_key(l['vname']):
834                if len(links[l['vname']]) < 2:
835                    links[l['vname']].append(l['vnode'])
836                else:
837                    nodes.append(l['vname'])
838                    lans[l['vname']] = links[l['vname']]
839                    del links[l['vname']]
840                    lans[l['vname']].append(l['vnode'])
841            elif lans.has_key(l['vname']):
842                lans[l['vname']].append(l['vnode'])
843            else:
844                links[l['vname']] = [ l['vnode'] ]
845
846
847        # Open up a temporary file for dot to turn into a visualization
848        try:
849            df, dotname = tempfile.mkstemp()
850            dotfile = os.fdopen(df, 'w')
851        except IOError:
852            raise service_error(service_error.internal,
853                    "Failed to open file in genviz")
854
855        # Generate a dot/neato input file from the links, nodes and lans
856        try:
857            print >>dotfile, "graph G {"
858            for n in nodes:
859                print >>dotfile, '\t"%s"' % n
860            for l in links.keys():
861                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
862            for l in lans.keys():
863                for n in lans[l]:
864                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
865            print >>dotfile, "}"
866            dotfile.close()
867        except TypeError:
868            raise service_error(service_error.internal,
869                    "Single endpoint link in vtopo")
870        except IOError:
871            raise service_error(service_error.internal, "Cannot write dot file")
872
873        # Use dot to create a visualization
874        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
875                '-Gpack=true', dotname], stdout=PIPE)
876
877        # Translate dot to vis format
878        vis_nodes = [ ]
879        vis = { 'node': vis_nodes }
880        for line in dot.stdout:
881            m = vis_re.match(line)
882            if m:
883                vn = m.group(1)
884                vis_node = {'name': vn, \
885                        'x': float(m.group(2)),\
886                        'y' : float(m.group(3)),\
887                    }
888                if vn in links.keys() or vn in lans.keys():
889                    vis_node['type'] = 'lan'
890                else:
891                    vis_node['type'] = 'node'
892                vis_nodes.append(vis_node)
893        rv = dot.wait()
894
895        os.remove(dotname)
896        if rv == 0 : return vis
897        else: return None
898
899    def get_access(self, tb, nodes, user, tbparam, master, export_project,
900            access_user):
901        """
902        Get access to testbed through fedd and set the parameters for that tb
903        """
904
905        # XXX This is needless complexity.  It must vanish.
906        translate_attr = {
907            'slavenodestartcmd': 'expstart',
908            'slaveconnectorstartcmd': 'gwstart',
909            'masternodestartcmd': 'mexpstart',
910            'masterconnectorstartcmd': 'mgwstart',
911            'connectorimage': 'gwimage',
912            'connectortype': 'gwtype',
913            'tunnelcfg': 'tun',
914            'tunnelinterface': 'tunnelinterface',
915            'smbshare': 'smbshare',
916            'slaveconnectorcmd': 'gwcmd',
917            'slaveconnectorcmdparams': 'gwcmdparams',
918            'masterconnectorcmd': 'mgwcmd',
919            'masterconnectorcmdparams': 'mgwcmdparams',
920        }
921
922        uri = self.tbmap.get(tb, None)
923        if not uri:
924            raise service_error(serice_error.server_config, 
925                    "Unknown testbed: %s" % tb)
926
927        # currently this lumps all users into one service access group
928        service_keys = [ a for u in user \
929                for a in u.get('access', []) \
930                    if a.has_key('sshPubkey')]
931
932        if len(service_keys) == 0:
933            raise service_error(service_error.req, 
934                    "Must have at least one SSH pubkey for services")
935
936
937        for p, u in access_user:
938            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
939                    "to %s") %  ((p or "None"), u, uri))
940
941            if p:
942                # Request with user and project specified
943                req = {\
944                        'destinationTestbed' : { 'uri' : uri },
945                        'project': { 
946                            'name': {'localname': p},
947                            'user': [ {'userID': { 'localname': u } } ],
948                            },
949                        'user':  user,
950                        'allocID' : { 'localname': 'test' },
951                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
952                        'serviceAccess' : service_keys
953                    }
954            else:
955                # Request with only user specified
956                req = {\
957                        'destinationTestbed' : { 'uri' : uri },
958                        'user':  [ {'userID': { 'localname': u } } ],
959                        'allocID' : { 'localname': 'test' },
960                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
961                        'serviceAccess' : service_keys
962                    }
963
964            if tb == master:
965                # NB, the export_project parameter is a dict that includes
966                # the type
967                req['exportProject'] = export_project
968
969            # node resources if any
970            if nodes != None and len(nodes) > 0:
971                rnodes = [ ]
972                for n in nodes:
973                    rn = { }
974                    image, hw, count = n.split(":")
975                    if image: rn['image'] = [ image ]
976                    if hw: rn['hardware'] = [ hw ]
977                    if count and int(count) >0 : rn['count'] = int(count)
978                    rnodes.append(rn)
979                req['resources']= { }
980                req['resources']['node'] = rnodes
981
982            try:
983                if self.local_access.has_key(uri):
984                    # Local access call
985                    req = { 'RequestAccessRequestBody' : req }
986                    r = self.local_access[uri].RequestAccess(req, 
987                            fedid(file=self.cert_file))
988                    r = { 'RequestAccessResponseBody' : r }
989                else:
990                    r = self.call_RequestAccess(uri, req, 
991                            self.cert_file, self.cert_pwd, self.trusted_certs)
992            except service_error, e:
993                if e.code == service_error.access:
994                    self.log.debug("[get_access] Access denied")
995                    r = None
996                    continue
997                else:
998                    raise e
999
1000            if r.has_key('RequestAccessResponseBody'):
1001                # Through to here we have a valid response, not a fault.
1002                # Access denied is a fault, so something better or worse than
1003                # access denied has happened.
1004                r = r['RequestAccessResponseBody']
1005                self.log.debug("[get_access] Access granted")
1006                break
1007            else:
1008                raise service_error(service_error.protocol,
1009                        "Bad proxy response")
1010       
1011        if not r:
1012            raise service_error(service_error.access, 
1013                    "Access denied by %s (%s)" % (tb, uri))
1014
1015        e = r['emulab']
1016        p = e['project']
1017        tbparam[tb] = { 
1018                "boss": e['boss'],
1019                "host": e['ops'],
1020                "domain": e['domain'],
1021                "fs": e['fileServer'],
1022                "eventserver": e['eventServer'],
1023                "project": unpack_id(p['name']),
1024                "emulab" : e,
1025                "allocID" : r['allocID'],
1026                }
1027        # Make the testbed name be the label the user applied
1028        p['testbed'] = {'localname': tb }
1029
1030        for u in p['user']:
1031            role = u.get('role', None)
1032            if role == 'experimentCreation':
1033                tbparam[tb]['user'] = unpack_id(u['userID'])
1034                break
1035        else:
1036            raise service_error(service_error.internal, 
1037                    "No createExperimentUser from %s" %tb)
1038
1039        for a in e['fedAttr']:
1040            if a['attribute']:
1041                key = translate_attr.get(a['attribute'].lower(), None)
1042                if key:
1043                    tbparam[tb][key]= a['value']
1044       
1045    def release_access(self, tb, aid):
1046        """
1047        Release access to testbed through fedd
1048        """
1049
1050        uri = self.tbmap.get(tb, None)
1051        if not uri:
1052            raise service_error(serice_error.server_config, 
1053                    "Unknown testbed: %s" % tb)
1054
1055        if self.local_access.has_key(uri):
1056            resp = self.local_access[uri].ReleaseAccess(\
1057                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1058                    fedid(file=self.cert_file))
1059            resp = { 'ReleaseAccessResponseBody': resp } 
1060        else:
1061            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1062                    self.cert_file, self.cert_pwd, self.trusted_certs)
1063
1064        # better error coding
1065
1066    def remote_splitter(self, uri, desc, master):
1067
1068        req = {
1069                'description' : { 'ns2description': desc },
1070                'master': master,
1071                'include_fedkit': bool(self.fedkit),
1072                'include_gatewaykit': bool(self.gatewaykit)
1073            }
1074
1075        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1076                self.trusted_certs)
1077
1078        if r.has_key('Ns2SplitResponseBody'):
1079            r = r['Ns2SplitResponseBody']
1080            if r.has_key('output'):
1081                return r['output'].splitlines()
1082            else:
1083                raise service_error(service_error.protocol, 
1084                        "Bad splitter response (no output)")
1085        else:
1086            raise service_error(service_error.protocol, "Bad splitter response")
1087       
1088    class current_testbed:
1089        """
1090        Object for collecting the current testbed description.  The testbed
1091        description is saved to a file with the local testbed variables
1092        subsittuted line by line.
1093        """
1094        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1095            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1096            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1097            self.current_testbed = None
1098            self.testbed_file = None
1099
1100            self.def_expstart = \
1101                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1102            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1103            self.def_gwstart = \
1104                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1105            self.def_mgwstart = \
1106                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1107            self.def_gwimage = "FBSD61-TUNNEL2";
1108            self.def_gwtype = "pc";
1109            self.def_mgwcmd = '# '
1110            self.def_mgwcmdparams = ''
1111            self.def_gwcmd = '# '
1112            self.def_gwcmdparams = ''
1113
1114            self.eid = eid
1115            self.tmpdir = tmpdir
1116            self.fedkit = fedkit
1117            self.gatewaykit = gatewaykit
1118
1119        def __call__(self, line, master, allocated, tbparams):
1120            # Capture testbed topology descriptions
1121            if self.current_testbed == None:
1122                m = self.begin_testbed.match(line)
1123                if m != None:
1124                    self.current_testbed = m.group(1)
1125                    if self.current_testbed == None:
1126                        raise service_error(service_error.req,
1127                                "Bad request format (unnamed testbed)")
1128                    allocated[self.current_testbed] = \
1129                            allocated.get(self.current_testbed,0) + 1
1130                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1131                    if not os.path.exists(tb_dir):
1132                        try:
1133                            os.mkdir(tb_dir)
1134                        except IOError:
1135                            raise service_error(service_error.internal,
1136                                    "Cannot create %s" % tb_dir)
1137                    try:
1138                        self.testbed_file = open("%s/%s.%s.tcl" %
1139                                (tb_dir, self.eid, self.current_testbed), 'w')
1140                    except IOError:
1141                        self.testbed_file = None
1142                    return True
1143                else: return False
1144            else:
1145                m = self.end_testbed.match(line)
1146                if m != None:
1147                    if m.group(1) != self.current_testbed:
1148                        raise service_error(service_error.internal, 
1149                                "Mismatched testbed markers!?")
1150                    if self.testbed_file != None: 
1151                        self.testbed_file.close()
1152                        self.testbed_file = None
1153                    self.current_testbed = None
1154                elif self.testbed_file:
1155                    # Substitute variables and put the line into the local
1156                    # testbed file.
1157                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1158                            self.def_gwtype)
1159                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1160                            self.def_gwimage)
1161                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1162                            self.def_mgwstart)
1163                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1164                            self.def_mexpstart)
1165                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1166                            self.def_gwstart)
1167                    expstart = tbparams[self.current_testbed].get('expstart', 
1168                            self.def_expstart)
1169                    project = tbparams[self.current_testbed].get('project')
1170                    gwcmd = tbparams[self.current_testbed].get('gwcmd', 
1171                            self.def_gwcmd)
1172                    gwcmdparams = tbparams[self.current_testbed].get(\
1173                            'gwcmdparams', self.def_gwcmdparams)
1174                    mgwcmd = tbparams[self.current_testbed].get('mgwcmd', 
1175                            self.def_gwcmd)
1176                    mgwcmdparams = tbparams[self.current_testbed].get(\
1177                            'mgwcmdparams', self.def_gwcmdparams)
1178                    line = re.sub("GWTYPE", gwtype, line)
1179                    line = re.sub("GWIMAGE", gwimage, line)
1180                    if self.current_testbed == master:
1181                        line = re.sub("GWSTART", mgwstart, line)
1182                        line = re.sub("EXPSTART", mexpstart, line)
1183                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1184                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1185                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1186                    else:
1187                        line = re.sub("GWSTART", gwstart, line)
1188                        line = re.sub("EXPSTART", expstart, line)
1189                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1190                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1191                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1192                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1193                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1194                    line = re.sub("EID", self.eid, line)
1195                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1196                            (project, self.eid), line)
1197                    if self.fedkit:
1198                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1199                                line)
1200                    if self.gatewaykit:
1201                        line = re.sub("GATEWAYKIT",
1202                                os.path.basename(self.gatewaykit), line)
1203                    print >>self.testbed_file, line
1204                return True
1205
1206    class allbeds:
1207        """
1208        Process the Allbeds section.  Get access to each federant and save the
1209        parameters in tbparams
1210        """
1211        def __init__(self, get_access):
1212            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1213            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1214            self.in_allbeds = False
1215            self.get_access = get_access
1216
1217        def __call__(self, line, user, tbparams, master, export_project,
1218                access_user):
1219            # Testbed access parameters
1220            if not self.in_allbeds:
1221                if self.begin_allbeds.match(line):
1222                    self.in_allbeds = True
1223                    return True
1224                else:
1225                    return False
1226            else:
1227                if self.end_allbeds.match(line):
1228                    self.in_allbeds = False
1229                else:
1230                    nodes = line.split('|')
1231                    tb = nodes.pop(0)
1232                    self.get_access(tb, nodes, user, tbparams, master,
1233                            export_project, access_user)
1234                return True
1235
1236    class gateways:
1237        def __init__(self, eid, master, tmpdir, gw_pubkey,
1238                gw_secretkey, copy_file, fedkit):
1239            self.begin_gateways = \
1240                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1241            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1242            self.current_gateways = None
1243            self.control_gateway = None
1244            self.active_end = { }
1245
1246            self.eid = eid
1247            self.master = master
1248            self.tmpdir = tmpdir
1249            self.gw_pubkey_base = gw_pubkey
1250            self.gw_secretkey_base = gw_secretkey
1251
1252            self.copy_file = copy_file
1253            self.fedkit = fedkit
1254
1255
1256        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1257                active_end, tbparams, dtb, myname, desthost, type):
1258            """
1259            Produce a gateway configuration file from a gateways line.
1260            """
1261
1262            sproject = tbparams[gw].get('project', 'project')
1263            dproject = tbparams[dtb].get('project', 'project')
1264            sdomain = ".%s.%s%s" % (eid, sproject,
1265                    tbparams[gw].get('domain', ".example.com"))
1266            ddomain = ".%s.%s%s" % (eid, dproject,
1267                    tbparams[dtb].get('domain', ".example.com"))
1268            boss = tbparams[master].get('boss', "boss")
1269            fs = tbparams[master].get('fs', "fs")
1270            event_server = "%s%s" % \
1271                    (tbparams[gw].get('eventserver', "event_server"),
1272                            tbparams[gw].get('domain', "example.com"))
1273            remote_event_server = "%s%s" % \
1274                    (tbparams[dtb].get('eventserver', "event_server"),
1275                            tbparams[dtb].get('domain', "example.com"))
1276            seer_control = "%s%s" % \
1277                    (tbparams[gw].get('control', "control"), sdomain)
1278            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1279
1280            if self.fedkit:
1281                remote_script_dir = "/usr/local/federation/bin"
1282                local_script_dir = "/usr/local/federation/bin"
1283            else:
1284                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1285                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1286
1287            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1288            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1289            tunnel_cfg = tbparams[gw].get("tun", "false")
1290
1291            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1292            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1293
1294            # translate to lower case so the `hostname` hack for specifying
1295            # configuration files works.
1296            conf_file = conf_file.lower();
1297            remote_conf_file = remote_conf_file.lower();
1298
1299            if dtb == master:
1300                active = "false"
1301            elif gw == master:
1302                active = "true"
1303            elif active_end.has_key('%s-%s' % (dtb, gw)):
1304                active = "false"
1305            else:
1306                active_end['%s-%s' % (gw, dtb)] = 1
1307                active = "true"
1308
1309            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1310            print >>gwconfig, "Active: %s" % active
1311            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1312            if tunnel_iface:
1313                print >>gwconfig, "Interface: %s" % tunnel_iface
1314            print >>gwconfig, "BossName: %s" % boss
1315            print >>gwconfig, "FsName: %s" % fs
1316            print >>gwconfig, "EventServerName: %s" % event_server
1317            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1318            print >>gwconfig, "SeerControl: %s" % seer_control
1319            print >>gwconfig, "Type: %s" % type
1320            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1321            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1322                    local_script_dir
1323            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1324            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1325            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1326                    (remote_conf_dir, remote_conf_file)
1327            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1328            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1329            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1330            gwconfig.close()
1331
1332            return active == "true"
1333
1334        def __call__(self, line, allocated, tbparams):
1335            # Process gateways
1336            if not self.current_gateways:
1337                m = self.begin_gateways.match(line)
1338                if m:
1339                    self.current_gateways = m.group(1)
1340                    if allocated.has_key(self.current_gateways):
1341                        # This test should always succeed
1342                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1343                        if not os.path.exists(tb_dir):
1344                            try:
1345                                os.mkdir(tb_dir)
1346                            except IOError:
1347                                raise service_error(service_error.internal,
1348                                        "Cannot create %s" % tb_dir)
1349                    else:
1350                        # XXX
1351                        self.log.error("[gateways]: Ignoring gateways for " + \
1352                                "unknown testbed %s" % self.current_gateways)
1353                        self.current_gateways = None
1354                    return True
1355                else:
1356                    return False
1357            else:
1358                m = self.end_gateways.match(line)
1359                if m :
1360                    if m.group(1) != self.current_gateways:
1361                        raise service_error(service_error.internal,
1362                                "Mismatched gateway markers!?")
1363                    if self.control_gateway:
1364                        try:
1365                            cc = open("%s/%s/client.conf" %
1366                                    (self.tmpdir, self.current_gateways), 'w')
1367                            print >>cc, "ControlGateway: %s" % \
1368                                    self.control_gateway
1369                            if tbparams[self.master].has_key('smbshare'):
1370                                print >>cc, "SMBSHare: %s" % \
1371                                        tbparams[self.master]['smbshare']
1372                            print >>cc, "ProjectUser: %s" % \
1373                                    tbparams[self.master]['user']
1374                            print >>cc, "ProjectName: %s" % \
1375                                    tbparams[self.master]['project']
1376                            print >>cc, "ExperimentID: %s/%s" % \
1377                                    ( tbparams[self.master]['project'], \
1378                                    self.eid )
1379                            cc.close()
1380                        except IOError:
1381                            raise service_error(service_error.internal,
1382                                    "Error creating client config")
1383                        # XXX: This seer specific file should disappear
1384                        try:
1385                            cc = open("%s/%s/seer.conf" %
1386                                    (self.tmpdir, self.current_gateways),
1387                                    'w')
1388                            if self.current_gateways != self.master:
1389                                print >>cc, "ControlNode: %s" % \
1390                                        self.control_gateway
1391                            print >>cc, "ExperimentID: %s/%s" % \
1392                                    ( tbparams[self.master]['project'], \
1393                                    self.eid )
1394                            cc.close()
1395                        except IOError:
1396                            raise service_error(service_error.internal,
1397                                    "Error creating seer config")
1398                    else:
1399                        debug.error("[gateways]: No control gateway for %s" %\
1400                                    self.current_gateways)
1401                    self.current_gateways = None
1402                else:
1403                    dtb, myname, desthost, type = line.split(" ")
1404
1405                    if type == "control" or type == "both":
1406                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1407                                self.eid, 
1408                                tbparams[self.current_gateways]['project'],
1409                                tbparams[self.current_gateways]['domain'])
1410                    try:
1411                        active = self.gateway_conf_file(self.current_gateways,
1412                                self.master, self.eid, self.gw_pubkey_base,
1413                                self.gw_secretkey_base,
1414                                self.active_end, tbparams, dtb, myname,
1415                                desthost, type)
1416                    except IOError, e:
1417                        raise service_error(service_error.internal,
1418                                "Failed to write config file for %s" % \
1419                                        self.current_gateway)
1420           
1421                    gw_pubkey = "%s/keys/%s" % \
1422                            (self.tmpdir, self.gw_pubkey_base)
1423                    gw_secretkey = "%s/keys/%s" % \
1424                            (self.tmpdir, self.gw_secretkey_base)
1425
1426                    pkfile = "%s/%s/%s" % \
1427                            ( self.tmpdir, self.current_gateways, 
1428                                    self.gw_pubkey_base)
1429                    skfile = "%s/%s/%s" % \
1430                            ( self.tmpdir, self.current_gateways, 
1431                                    self.gw_secretkey_base)
1432
1433                    if not os.path.exists(pkfile):
1434                        try:
1435                            self.copy_file(gw_pubkey, pkfile)
1436                        except IOError:
1437                            service_error(service_error.internal,
1438                                    "Failed to copy pubkey file")
1439
1440                    if active and not os.path.exists(skfile):
1441                        try:
1442                            self.copy_file(gw_secretkey, skfile)
1443                        except IOError:
1444                            service_error(service_error.internal,
1445                                    "Failed to copy secretkey file")
1446                return True
1447
1448    class shunt_to_file:
1449        """
1450        Simple class to write data between two regexps to a file.
1451        """
1452        def __init__(self, begin, end, filename):
1453            """
1454            Begin shunting on a match of begin, stop on end, send data to
1455            filename.
1456            """
1457            self.begin = re.compile(begin)
1458            self.end = re.compile(end)
1459            self.in_shunt = False
1460            self.file = None
1461            self.filename = filename
1462
1463        def __call__(self, line):
1464            """
1465            Call this on each line in the input that may be shunted.
1466            """
1467            if not self.in_shunt:
1468                if self.begin.match(line):
1469                    self.in_shunt = True
1470                    try:
1471                        self.file = open(self.filename, "w")
1472                    except:
1473                        self.file = None
1474                        raise
1475                    return True
1476                else:
1477                    return False
1478            else:
1479                if self.end.match(line):
1480                    if self.file: 
1481                        self.file.close()
1482                        self.file = None
1483                    self.in_shunt = False
1484                else:
1485                    if self.file:
1486                        print >>self.file, line
1487                return True
1488
1489    class shunt_to_list:
1490        """
1491        Same interface as shunt_to_file.  Data collected in self.list, one list
1492        element per line.
1493        """
1494        def __init__(self, begin, end):
1495            self.begin = re.compile(begin)
1496            self.end = re.compile(end)
1497            self.in_shunt = False
1498            self.list = [ ]
1499       
1500        def __call__(self, line):
1501            if not self.in_shunt:
1502                if self.begin.match(line):
1503                    self.in_shunt = True
1504                    return True
1505                else:
1506                    return False
1507            else:
1508                if self.end.match(line):
1509                    self.in_shunt = False
1510                else:
1511                    self.list.append(line)
1512                return True
1513
1514    class shunt_to_string:
1515        """
1516        Same interface as shunt_to_file.  Data collected in self.str, all in
1517        one string.
1518        """
1519        def __init__(self, begin, end):
1520            self.begin = re.compile(begin)
1521            self.end = re.compile(end)
1522            self.in_shunt = False
1523            self.str = ""
1524       
1525        def __call__(self, line):
1526            if not self.in_shunt:
1527                if self.begin.match(line):
1528                    self.in_shunt = True
1529                    return True
1530                else:
1531                    return False
1532            else:
1533                if self.end.match(line):
1534                    self.in_shunt = False
1535                else:
1536                    self.str += line
1537                return True
1538
1539    def create_experiment(self, req, fid):
1540        """
1541        The external interface to experiment creation called from the
1542        dispatcher.
1543
1544        Creates a working directory, splits the incoming description using the
1545        splitter script and parses out the avrious subsections using the
1546        lcasses above.  Once each sub-experiment is created, use pooled threads
1547        to instantiate them and start it all up.
1548        """
1549
1550        if not self.auth.check_attribute(fid, 'create'):
1551            raise service_error(service_error.access, "Create access denied")
1552
1553        try:
1554            tmpdir = tempfile.mkdtemp(prefix="split-")
1555        except IOError:
1556            raise service_error(service_error.internal, "Cannot create tmp dir")
1557
1558        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1559        gw_secretkey_base = "fed.%s" % self.ssh_type
1560        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1561        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1562        tclfile = tmpdir + "/experiment.tcl"
1563        tbparams = { }
1564        try:
1565            access_user = self.accessdb[fid]
1566        except KeyError:
1567            raise service_error(service_error.internal,
1568                    "Access map and authorizer out of sync in " + \
1569                            "create_experiment for fedid %s"  % fid)
1570
1571        pid = "dummy"
1572        gid = "dummy"
1573        # XXX
1574        fail_soft = False
1575
1576        try:
1577            os.mkdir(tmpdir+"/keys")
1578        except OSError:
1579            raise service_error(service_error.internal,
1580                    "Can't make temporary dir")
1581
1582        req = req.get('CreateRequestBody', None)
1583        if not req:
1584            raise service_error(service_error.req,
1585                    "Bad request format (no CreateRequestBody)")
1586        # The tcl parser needs to read a file so put the content into that file
1587        descr=req.get('experimentdescription', None)
1588        if descr:
1589            file_content=descr.get('ns2description', None)
1590            if file_content:
1591                try:
1592                    f = open(tclfile, 'w')
1593                    f.write(file_content)
1594                    f.close()
1595                except IOError:
1596                    raise service_error(service_error.internal,
1597                            "Cannot write temp experiment description")
1598            else:
1599                raise service_error(service_error.req, 
1600                        "Only ns2descriptions supported")
1601        else:
1602            raise service_error(service_error.req, "No experiment description")
1603
1604        if req.has_key('experimentID') and \
1605                req['experimentID'].has_key('localname'):
1606            eid = req['experimentID']['localname']
1607            self.state_lock.acquire()
1608            while (self.state.has_key(eid)):
1609                eid += random.choice(string.ascii_letters)
1610            # To avoid another thread picking this localname
1611            self.state[eid] = "placeholder"
1612            self.state_lock.release()
1613        else:
1614            eid = self.exp_stem
1615            for i in range(0,5):
1616                eid += random.choice(string.ascii_letters)
1617            self.state_lock.acquire()
1618            while (self.state.has_key(eid)):
1619                eid = self.exp_stem
1620                for i in range(0,5):
1621                    eid += random.choice(string.ascii_letters)
1622            # To avoid another thread picking this localname
1623            self.state[eid] = "placeholder"
1624            self.state_lock.release()
1625
1626        try: 
1627            # This catches exceptions to clear the placeholder if necessary
1628            try:
1629                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1630            except ValueError:
1631                raise service_error(service_error.server_config, 
1632                        "Bad key type (%s)" % self.ssh_type)
1633
1634            user = req.get('user', None)
1635            if user == None:
1636                raise service_error(service_error.req, "No user")
1637
1638            master = req.get('master', None)
1639            if not master:
1640                raise service_error(service_error.req,
1641                        "No master testbed label")
1642            export_project = req.get('exportProject', None)
1643            if not export_project:
1644                raise service_error(service_error.req, "No export project")
1645           
1646            if self.splitter_url:
1647                self.log.debug("Calling remote splitter at %s" % \
1648                        self.splitter_url)
1649                split_data = self.remote_splitter(self.splitter_url,
1650                        file_content, master)
1651            else:
1652                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1653                    str(self.muxmax), '-m', master]
1654
1655                if self.fedkit:
1656                    tclcmd.append('-k')
1657
1658                if self.gatewaykit:
1659                    tclcmd.append('-K')
1660
1661                tclcmd.extend([pid, gid, eid, tclfile])
1662
1663                self.log.debug("running local splitter %s", " ".join(tclcmd))
1664                tclparser = Popen(tclcmd, stdout=PIPE)
1665                split_data = tclparser.stdout
1666
1667            allocated = { }         # Testbeds we can access
1668            started = { }           # Testbeds where a sub-experiment started
1669                                # successfully
1670
1671            # Objects to parse the splitter output (defined above)
1672            parse_current_testbed = self.current_testbed(eid, tmpdir,
1673                    self.fedkit, self.gatewaykit)
1674            parse_allbeds = self.allbeds(self.get_access)
1675            parse_gateways = self.gateways(eid, master, tmpdir,
1676                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1677                    self.fedkit)
1678            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1679                        "^#\s+End\s+Vtopo")
1680            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1681                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1682            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1683                    "^#\s+End\s+tarfiles")
1684            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1685                    "^#\s+End\s+rpms")
1686
1687            # Working on the split data
1688            for line in split_data:
1689                line = line.rstrip()
1690                if parse_current_testbed(line, master, allocated, tbparams):
1691                    continue
1692                elif parse_allbeds(line, user, tbparams, master, export_project,
1693                        access_user):
1694                    continue
1695                elif parse_gateways(line, allocated, tbparams):
1696                    continue
1697                elif parse_vtopo(line):
1698                    continue
1699                elif parse_hostnames(line):
1700                    continue
1701                elif parse_tarfiles(line):
1702                    continue
1703                elif parse_rpms(line):
1704                    continue
1705                else:
1706                    raise service_error(service_error.internal, 
1707                            "Bad tcl parse? %s" % line)
1708            # Virtual topology and visualization
1709            vtopo = self.gentopo(parse_vtopo.str)
1710            if not vtopo:
1711                raise service_error(service_error.internal, 
1712                        "Failed to generate virtual topology")
1713
1714            vis = self.genviz(vtopo)
1715            if not vis:
1716                raise service_error(service_error.internal, 
1717                        "Failed to generate visualization")
1718           
1719            # save federant information
1720            for k in allocated.keys():
1721                tbparams[k]['federant'] = {\
1722                        'name': [ { 'localname' : eid} ],\
1723                        'emulab': tbparams[k]['emulab'],\
1724                        'allocID' : tbparams[k]['allocID'],\
1725                        'master' : k == master,\
1726                    }
1727
1728
1729            # Copy tarfiles and rpms needed at remote sites into a staging area
1730            try:
1731                if self.fedkit:
1732                    parse_tarfiles.list.append(self.fedkit)
1733                if self.gatewaykit:
1734                    parse_tarfiles.list.append(self.gatewaykit)
1735                for t in parse_tarfiles.list:
1736                    if not os.path.exists("%s/tarfiles" % tmpdir):
1737                        os.mkdir("%s/tarfiles" % tmpdir)
1738                    self.copy_file(t, "%s/tarfiles/%s" % \
1739                            (tmpdir, os.path.basename(t)))
1740                for r in parse_rpms.list:
1741                    if not os.path.exists("%s/rpms" % tmpdir):
1742                        os.mkdir("%s/rpms" % tmpdir)
1743                    self.copy_file(r, "%s/rpms/%s" % \
1744                            (tmpdir, os.path.basename(r)))
1745            except IOError, e:
1746                raise service_error(service_error.internal, 
1747                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1748
1749        except service_error, e:
1750            # If something goes wrong in the parse (usually an access error)
1751            # clear the placeholder state.  From here on out the code delays
1752            # exceptions.
1753            self.state_lock.acquire()
1754            del self.state[eid]
1755            self.state_lock.release()
1756            raise e
1757
1758        thread_pool = self.thread_pool(self.nthreads)
1759        threads = [ ]
1760
1761        for tb in [ k for k in allocated.keys() if k != master]:
1762            # Create and start a thread to start the segment, and save it to
1763            # get the return value later
1764            thread_pool.wait_for_slot()
1765            t  = self.pooled_thread(target=self.start_segment, 
1766                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1767                    pdata=thread_pool, trace_file=self.trace_file)
1768            threads.append(t)
1769            t.start()
1770
1771        # Wait until all finish
1772        thread_pool.wait_for_all_done()
1773
1774        # If none failed, start the master
1775        failed = [ t.getName() for t in threads if not t.rv ]
1776
1777        if len(failed) == 0:
1778            if not self.start_segment(master, eid, tbparams, tmpdir):
1779                failed.append(master)
1780
1781        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1782        # If one failed clean up, unless fail_soft is set
1783        if failed:
1784            if not fail_soft:
1785                thread_pool.clear()
1786                for tb in succeeded:
1787                    # Create and start a thread to stop the segment
1788                    thread_pool.wait_for_slot()
1789                    t  = self.pooled_thread(target=self.stop_segment, 
1790                            args=(tb, eid, tbparams), name=tb,
1791                            pdata=thread_pool, trace_file=self.trace_file)
1792                    t.start()
1793                # Wait until all finish
1794                thread_pool.wait_for_all_done()
1795
1796                # release the allocations
1797                for tb in tbparams.keys():
1798                    self.release_access(tb, tbparams[tb]['allocID'])
1799                # Remove the placeholder
1800                self.state_lock.acquire()
1801                del self.state[eid]
1802                self.state_lock.release()
1803
1804                raise service_error(service_error.federant,
1805                    "Swap in failed on %s" % ",".join(failed))
1806        else:
1807            self.log.info("[start_segment]: Experiment %s started" % eid)
1808
1809        # Generate an ID for the experiment (slice) and a certificate that the
1810        # allocator can use to prove they own it.  We'll ship it back through
1811        # the encrypted connection.
1812        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1813
1814        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1815
1816        # Walk up tmpdir, deleting as we go
1817        for path, dirs, files in os.walk(tmpdir, topdown=False):
1818            for f in files:
1819                os.remove(os.path.join(path, f))
1820            for d in dirs:
1821                os.rmdir(os.path.join(path, d))
1822        os.rmdir(tmpdir)
1823
1824        # The deepcopy prevents the allocation ID and other binaries from being
1825        # translated into other formats
1826        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1827                for tb in tbparams.keys() \
1828                    if tbparams[tb].has_key('federant') ],\
1829                    'vtopo': vtopo,\
1830                    'vis' : vis,
1831                    'experimentID' : [\
1832                            { 'fedid': copy.copy(expid) }, \
1833                            { 'localname': eid },\
1834                        ],\
1835                    'experimentAccess': { 'X509' : expcert },\
1836                }
1837        # remove the allocationID info from each federant
1838        for f in resp['federant']:
1839            if f.has_key('allocID'): del f['allocID']
1840
1841        # Insert the experiment into our state and update the disk copy
1842        self.state_lock.acquire()
1843        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1844                for tb in tbparams.keys() \
1845                    if tbparams[tb].has_key('federant') ],\
1846                    'vtopo': vtopo,\
1847                    'vis' : vis,
1848                    'owner': fid,
1849                    'experimentID' : [\
1850                            { 'fedid': expid }, { 'localname': eid },\
1851                        ],\
1852                }
1853        self.state[eid] = self.state[expid]
1854        if self.state_filename: self.write_state()
1855        self.state_lock.release()
1856
1857        self.auth.set_attribute(fid, expid)
1858        self.auth.set_attribute(expid, expid)
1859
1860        if not failed:
1861            return resp
1862        else:
1863            raise service_error(service_error.partial, \
1864                    "Partial swap in on %s" % ",".join(succeeded))
1865
1866    def check_experiment_access(self, fid, key):
1867        """
1868        Confirm that the fid has access to the experiment.  Though a request
1869        may be made in terms of a local name, the access attribute is always
1870        the experiment's fedid.
1871        """
1872        if not isinstance(key, fedid):
1873            self.state_lock.acquire()
1874            if self.state.has_key(key):
1875                if isinstance(self.state[key], dict):
1876                    try:
1877                        kl = [ f['fedid'] for f in \
1878                                self.state[key]['experimentID']\
1879                                    if f.has_key('fedid') ]
1880                    except KeyError:
1881                        self.state_lock.release()
1882                        raise service_error(service_error.internal, 
1883                                "No fedid for experiment %s when checking " +\
1884                                        "access(!?)" % key)
1885                    if len(kl) == 1:
1886                        key = kl[0]
1887                    else:
1888                        self.state_lock.release()
1889                        raise service_error(service_error.internal, 
1890                                "multiple fedids for experiment %s when " +\
1891                                        "checking access(!?)" % key)
1892                elif isinstance(self.state[key], str):
1893                    self.state_lock.release()
1894                    raise service_error(service_error.internal, 
1895                            ("experiment %s is placeholder.  " +\
1896                                    "Creation in progress or aborted oddly") \
1897                                    % key)
1898                else:
1899                    self.state_lock.release()
1900                    raise service_error(service_error.internal, 
1901                            "Unexpected state for %s" % key)
1902
1903            else:
1904                self.state_lock.release()
1905                raise service_error(service_error.access, "Access Denied")
1906            self.state_lock.release()
1907
1908        if self.auth.check_attribute(fid, key):
1909            return True
1910        else:
1911            raise service_error(service_error.access, "Access Denied")
1912
1913
1914
1915    def get_vtopo(self, req, fid):
1916        """
1917        Return the stored virtual topology for this experiment
1918        """
1919        rv = None
1920
1921        req = req.get('VtopoRequestBody', None)
1922        if not req:
1923            raise service_error(service_error.req,
1924                    "Bad request format (no VtopoRequestBody)")
1925        exp = req.get('experiment', None)
1926        if exp:
1927            if exp.has_key('fedid'):
1928                key = exp['fedid']
1929                keytype = "fedid"
1930            elif exp.has_key('localname'):
1931                key = exp['localname']
1932                keytype = "localname"
1933            else:
1934                raise service_error(service_error.req, "Unknown lookup type")
1935        else:
1936            raise service_error(service_error.req, "No request?")
1937
1938        self.check_experiment_access(fid, key)
1939
1940        self.state_lock.acquire()
1941        if self.state.has_key(key):
1942            rv = { 'experiment' : {keytype: key },\
1943                    'vtopo': self.state[key]['vtopo'],\
1944                }
1945        self.state_lock.release()
1946
1947        if rv: return rv
1948        else: raise service_error(service_error.req, "No such experiment")
1949
1950    def get_vis(self, req, fid):
1951        """
1952        Return the stored visualization for this experiment
1953        """
1954        rv = None
1955
1956        req = req.get('VisRequestBody', None)
1957        if not req:
1958            raise service_error(service_error.req,
1959                    "Bad request format (no VisRequestBody)")
1960        exp = req.get('experiment', None)
1961        if exp:
1962            if exp.has_key('fedid'):
1963                key = exp['fedid']
1964                keytype = "fedid"
1965            elif exp.has_key('localname'):
1966                key = exp['localname']
1967                keytype = "localname"
1968            else:
1969                raise service_error(service_error.req, "Unknown lookup type")
1970        else:
1971            raise service_error(service_error.req, "No request?")
1972
1973        self.check_experiment_access(fid, key)
1974
1975        self.state_lock.acquire()
1976        if self.state.has_key(key):
1977            rv =  { 'experiment' : {keytype: key },\
1978                    'vis': self.state[key]['vis'],\
1979                    }
1980        self.state_lock.release()
1981
1982        if rv: return rv
1983        else: raise service_error(service_error.req, "No such experiment")
1984
1985    def get_info(self, req, fid):
1986        """
1987        Return all the stored info about this experiment
1988        """
1989        rv = None
1990
1991        req = req.get('InfoRequestBody', None)
1992        if not req:
1993            raise service_error(service_error.req,
1994                    "Bad request format (no VisRequestBody)")
1995        exp = req.get('experiment', None)
1996        if exp:
1997            if exp.has_key('fedid'):
1998                key = exp['fedid']
1999                keytype = "fedid"
2000            elif exp.has_key('localname'):
2001                key = exp['localname']
2002                keytype = "localname"
2003            else:
2004                raise service_error(service_error.req, "Unknown lookup type")
2005        else:
2006            raise service_error(service_error.req, "No request?")
2007
2008        self.check_experiment_access(fid, key)
2009
2010        # The state may be massaged by the service function that called
2011        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2012        # state.
2013        self.state_lock.acquire()
2014        if self.state.has_key(key):
2015            rv = copy.deepcopy(self.state[key])
2016        self.state_lock.release()
2017        # Remove the owner info
2018        del rv['owner']
2019        # remove the allocationID info from each federant
2020        for f in rv['federant']:
2021            if f.has_key('allocID'): del f['allocID']
2022
2023        if rv: return rv
2024        else: raise service_error(service_error.req, "No such experiment")
2025
2026
2027    def terminate_experiment(self, req, fid):
2028        """
2029        Swap this experiment out on the federants and delete the shared
2030        information
2031        """
2032        tbparams = { }
2033        req = req.get('TerminateRequestBody', None)
2034        if not req:
2035            raise service_error(service_error.req,
2036                    "Bad request format (no TerminateRequestBody)")
2037        exp = req.get('experiment', None)
2038        if exp:
2039            if exp.has_key('fedid'):
2040                key = exp['fedid']
2041                keytype = "fedid"
2042            elif exp.has_key('localname'):
2043                key = exp['localname']
2044                keytype = "localname"
2045            else:
2046                raise service_error(service_error.req, "Unknown lookup type")
2047        else:
2048            raise service_error(service_error.req, "No request?")
2049
2050        self.check_experiment_access(fid, key)
2051
2052        self.state_lock.acquire()
2053        fed_exp = self.state.get(key, None)
2054
2055        if fed_exp:
2056            # This branch of the conditional holds the lock to generate a
2057            # consistent temporary tbparams variable to deallocate experiments.
2058            # It releases the lock to do the deallocations and reacquires it to
2059            # remove the experiment state when the termination is complete.
2060            ids = []
2061            #  experimentID is a list of dicts that are self-describing
2062            #  identifiers.  This finds all the fedids and localnames - the
2063            #  keys of self.state - and puts them into ids.
2064            for id in fed_exp.get('experimentID', []):
2065                if id.has_key('fedid'): ids.append(id['fedid'])
2066                if id.has_key('localname'): ids.append(id['localname'])
2067
2068            # Construct enough of the tbparams to make the stop_segment calls
2069            # work
2070            for fed in fed_exp['federant']:
2071                try:
2072                    for e in fed['name']:
2073                        eid = e.get('localname', None)
2074                        if eid: break
2075                    else:
2076                        continue
2077
2078                    p = fed['emulab']['project']
2079
2080                    project = p['name']['localname']
2081                    tb = p['testbed']['localname']
2082                    user = p['user'][0]['userID']['localname']
2083
2084                    domain = fed['emulab']['domain']
2085                    host  = fed['emulab']['ops']
2086                    aid = fed['allocID']
2087                except KeyError, e:
2088                    continue
2089                tbparams[tb] = {\
2090                        'user': user,\
2091                        'domain': domain,\
2092                        'project': project,\
2093                        'host': host,\
2094                        'eid': eid,\
2095                        'aid': aid,\
2096                    }
2097            self.state_lock.release()
2098
2099            # Stop everyone.
2100            thread_pool = self.thread_pool(self.nthreads)
2101            for tb in tbparams.keys():
2102                # Create and start a thread to stop the segment
2103                thread_pool.wait_for_slot()
2104                t  = self.pooled_thread(target=self.stop_segment, 
2105                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2106                        pdata=thread_pool, trace_file=self.trace_file)
2107                t.start()
2108            # Wait for completions
2109            thread_pool.wait_for_all_done()
2110
2111            # release the allocations
2112            for tb in tbparams.keys():
2113                self.release_access(tb, tbparams[tb]['aid'])
2114
2115            # Remove the terminated experiment
2116            self.state_lock.acquire()
2117            for id in ids:
2118                if self.state.has_key(id): del self.state[id]
2119
2120            if self.state_filename: self.write_state()
2121            self.state_lock.release()
2122
2123            return { 'experiment': exp }
2124        else:
2125            # Don't forget to release the lock
2126            self.state_lock.release()
2127            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.