source: fedd/federation/experiment_control.py @ c570f7e

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since c570f7e was c570f7e, checked in by Ted Faber <faber@…>, 15 years ago

another timeout bug - check the right experiment

  • Property mode set to 100644
File size: 90.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import *
21from subprocess import *
22
23from util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from service_error import service_error
27
28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
35class experiment_control_local:
36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
42
43    class ssh_cmd_timeout(RuntimeError): pass
44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = Lock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
70   
71    class thread_pool:
72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
148
149    class pooled_thread(Thread):
150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
188
189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
192
193    def __init__(self, config=None, auth=None):
194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
214        self.list_log = experiment_control_local.list_log
215
216        self.cert_file = config.get("experiment_control", "cert_file")
217        if self.cert_file:
218            self.cert_pwd = config.get("experiment_control", "cert_pwd")
219        else:
220            self.cert_file = config.get("globals", "cert_file")
221            self.cert_pwd = config.get("globals", "cert_pwd")
222
223        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
224                or config.get("globals", "trusted_certs")
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 2
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'Create': soap_handler('Create', self.create_experiment),
337                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
338                'Vis': soap_handler('Vis', self.get_vis),
339                'Info': soap_handler('Info', self.get_info),
340                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
341                'Terminate': soap_handler('Terminate', 
342                    self.terminate_experiment),
343        }
344
345        self.xmlrpc_services = {\
346                'Create': xmlrpc_handler('Create', self.create_experiment),
347                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
348                'Vis': xmlrpc_handler('Vis', self.get_vis),
349                'Info': xmlrpc_handler('Info', self.get_info),
350                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
351                'Terminate': xmlrpc_handler('Terminate',
352                    self.terminate_experiment),
353        }
354
355    def copy_file(self, src, dest, size=1024):
356        """
357        Exceedingly simple file copy.
358        """
359        s = open(src,'r')
360        d = open(dest, 'w')
361
362        buf = "x"
363        while buf != "":
364            buf = s.read(size)
365            d.write(buf)
366        s.close()
367        d.close()
368
369    # Call while holding self.state_lock
370    def write_state(self):
371        """
372        Write a new copy of experiment state after copying the existing state
373        to a backup.
374
375        State format is a simple pickling of the state dictionary.
376        """
377        if os.access(self.state_filename, os.W_OK):
378            self.copy_file(self.state_filename, \
379                    "%s.bak" % self.state_filename)
380        try:
381            f = open(self.state_filename, 'w')
382            pickle.dump(self.state, f)
383        except IOError, e:
384            self.log.error("Can't write file %s: %s" % \
385                    (self.state_filename, e))
386        except pickle.PicklingError, e:
387            self.log.error("Pickling problem: %s" % e)
388        except TypeError, e:
389            self.log.error("Pickling problem (TypeError): %s" % e)
390
391    # Call while holding self.state_lock
392    def read_state(self):
393        """
394        Read a new copy of experiment state.  Old state is overwritten.
395
396        State format is a simple pickling of the state dictionary.
397        """
398        try:
399            f = open(self.state_filename, "r")
400            self.state = pickle.load(f)
401            self.log.debug("[read_state]: Read state from %s" % \
402                    self.state_filename)
403        except IOError, e:
404            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
405                    % (self.state_filename, e))
406        except pickle.UnpicklingError, e:
407            self.log.warning(("[read_state]: No saved state: " + \
408                    "Unpickling failed: %s") % e)
409       
410        for k in self.state.keys():
411            try:
412                # This list should only have one element in it, but phrasing it
413                # as a for loop doesn't cost much, really.  We have to find the
414                # fedid elements anyway.
415                for eid in [ f['fedid'] \
416                        for f in self.state[k]['experimentID']\
417                            if f.has_key('fedid') ]:
418                    self.auth.set_attribute(self.state[k]['owner'], eid)
419                    # allow overrides to control experiments as well
420                    for o in self.overrides:
421                        self.auth.set_attribute(o, eid)
422            except KeyError, e:
423                self.log.warning("[read_state]: State ownership or identity " +\
424                        "misformatted in %s: %s" % (self.state_filename, e))
425
426
427    def read_accessdb(self, accessdb_file):
428        """
429        Read the mapping from fedids that can create experiments to their name
430        in the 3-level access namespace.  All will be asserted from this
431        testbed and can include the local username and porject that will be
432        asserted on their behalf by this fedd.  Each fedid is also added to the
433        authorization system with the "create" attribute.
434        """
435        self.accessdb = {}
436        # These are the regexps for parsing the db
437        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
438        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
439                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
440        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
441                "\s*->\s*(" + name_expr + ")\s*$")
442        lineno = 0
443
444        # Parse the mappings and store in self.authdb, a dict of
445        # fedid -> (proj, user)
446        try:
447            f = open(accessdb_file, "r")
448            for line in f:
449                lineno += 1
450                line = line.strip()
451                if len(line) == 0 or line.startswith('#'):
452                    continue
453                m = project_line.match(line)
454                if m:
455                    fid = fedid(hexstr=m.group(1))
456                    project, user = m.group(2,3)
457                    if not self.accessdb.has_key(fid):
458                        self.accessdb[fid] = []
459                    self.accessdb[fid].append((project, user))
460                    continue
461
462                m = user_line.match(line)
463                if m:
464                    fid = fedid(hexstr=m.group(1))
465                    project = None
466                    user = m.group(2)
467                    if not self.accessdb.has_key(fid):
468                        self.accessdb[fid] = []
469                    self.accessdb[fid].append((project, user))
470                    continue
471                self.log.warn("[experiment_control] Error parsing access " +\
472                        "db %s at line %d" %  (accessdb_file, lineno))
473        except IOError:
474            raise service_error(service_error.internal,
475                    "Error opening/reading %s as experiment " +\
476                            "control accessdb" %  accessdb_file)
477        f.close()
478
479        # Initialize the authorization attributes
480        for fid in self.accessdb.keys():
481            self.auth.set_attribute(fid, 'create')
482
483    def read_mapdb(self, file):
484        """
485        Read a simple colon separated list of mappings for the
486        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
487        """
488
489        self.tbmap = { }
490        lineno =0
491        try:
492            f = open(file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if line.startswith('#') or len(line) == 0:
497                    continue
498                try:
499                    label, url = line.split(':', 1)
500                    self.tbmap[label] = url
501                except ValueError, e:
502                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
503                            "map db: %s %s" % (lineno, line, e))
504        except IOError, e:
505            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
506                    "open %s: %s" % (file, e))
507        f.close()
508
509    class emulab_segment:
510        def __init__(self, log=None, keyfile=None, debug=False):
511            self.log = log or logging.getLogger(\
512                    'fedd.experiment_control.emulab_segment')
513            self.ssh_privkey_file = keyfile
514            self.debug = debug
515            self.ssh_exec="/usr/bin/ssh"
516            self.scp_exec = "/usr/bin/scp"
517            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
518
519        def scp_file(self, file, user, host, dest=""):
520            """
521            scp a file to the remote host.  If debug is set the action is only
522            logged.
523            """
524
525            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
526                    '-o', 'StrictHostKeyChecking yes', '-i', 
527                    self.ssh_privkey_file, file, 
528                    "%s@%s:%s" % (user, host, dest)]
529            rv = 0
530
531            try:
532                dnull = open("/dev/null", "w")
533            except IOError:
534                self.log.debug("[ssh_file]: failed to open " + \
535                        "/dev/null for redirect")
536                dnull = Null
537
538            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
539            if not self.debug:
540                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
541                        close_fds=True)
542
543            return rv == 0
544
545        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
546            """
547            Run a remote command on host as user.  If debug is set, the action
548            is only logged.  Commands are run without stdin, to avoid stray
549            SIGTTINs.
550            """
551            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
552                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
553                    (self.ssh_exec, self.ssh_privkey_file, 
554                            user, host, cmd)
555
556            try:
557                dnull = open("/dev/null", "r")
558            except IOError:
559                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
560                        "for redirect")
561                dnull = Null
562
563            self.log.debug("[ssh_cmd]: %s" % sh_str)
564            if not self.debug:
565                if dnull:
566                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
567                            close_fds=True)
568                else:
569                    sub = Popen(sh_str, shell=True,
570                            close_fds=True)
571                if timeout:
572                    i = 0
573                    rv = sub.poll()
574                    while i < timeout:
575                        if rv is not None: break
576                        else:
577                            time.sleep(1)
578                            rv = sub.poll()
579                            i += 1
580                    else:
581                        self.log.debug("Process exceeded runtime: %s" % sh_str)
582                        os.kill(sub.pid, signal.SIGKILL)
583                        raise self.ssh_cmd_timeout();
584                    return rv == 0
585                else:
586                    return sub.wait() == 0
587            else:
588                if timeout == 0:
589                    self.log.debug("debug timeout raised on %s " % sh_str)
590                    raise self.ssh_cmd_timeout()
591                else:
592                    return True
593
594    class start_segment(emulab_segment):
595        def __init__(self, log=None, keyfile=None, debug=False):
596            experiment_control_local.emulab_segment.__init__(self,
597                    log=log, keyfile=keyfile, debug=debug)
598
599        def create_config_tree(self, src_dir, dest_dir, script):
600            """
601            Append commands to script that will create the directory hierarchy
602            on the remote federant.
603            """
604
605            if os.path.isdir(src_dir):
606                print >>script, "mkdir -p %s" % dest_dir
607                print >>script, "chmod 770 %s" % dest_dir
608
609                for f in os.listdir(src_dir):
610                    if os.path.isdir(f):
611                        self.create_config_tree("%s/%s" % (src_dir, f), 
612                                "%s/%s" % (dest_dir, f), script)
613            else:
614                self.log.debug("[create_config_tree]: Not a directory: %s" \
615                        % src_dir)
616
617        def ship_configs(self, host, user, src_dir, dest_dir):
618            """
619            Copy federant-specific configuration files to the federant.
620            """
621            for f in os.listdir(src_dir):
622                if os.path.isdir(f):
623                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
624                            "%s/%s" % (dest_dir, f)):
625                        return False
626                else:
627                    if not self.scp_file("%s/%s" % (src_dir, f), 
628                            user, host, dest_dir):
629                        return False
630            return True
631
632        def get_state(self, user, host, tb, pid, eid):
633            # command to test experiment state
634            expinfo_exec = "/usr/testbed/bin/expinfo" 
635            # Regular expressions to parse the expinfo response
636            state_re = re.compile("State:\s+(\w+)")
637            no_exp_re = re.compile("^No\s+such\s+experiment")
638            swapping_re = re.compile("^No\s+information\s+available.")
639            state = None    # Experiment state parsed from expinfo
640            # The expinfo ssh command.  Note the identity restriction to use
641            # only the identity provided in the pubkey given.
642            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
643                    'StrictHostKeyChecking yes', '-i', 
644                    self.ssh_privkey_file, "%s@%s" % (user, host), 
645                    expinfo_exec, pid, eid]
646
647            dev_null = None
648            try:
649                dev_null = open("/dev/null", "a")
650            except IOError, e:
651                self.log.error("[get_state]: can't open /dev/null: %s" %e)
652
653            if self.debug:
654                state = 'swapped'
655                rv = 0
656            else:
657                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
658                        close_fds=True)
659                for line in status.stdout:
660                    m = state_re.match(line)
661                    if m: state = m.group(1)
662                    else:
663                        for reg, st in ((no_exp_re, "none"),
664                                (swapping_re, "swapping")):
665                            m = reg.match(line)
666                            if m: state = st
667                rv = status.wait()
668
669            # If the experiment is not present the subcommand returns a
670            # non-zero return value.  If we successfully parsed a "none"
671            # outcome, ignore the return code.
672            if rv != 0 and state != 'none':
673                raise service_error(service_error.internal,
674                        "Cannot get status of segment %s:%s/%s" % \
675                                (tb, pid, eid))
676            elif state not in ('active', 'swapped', 'swapping', 'none'):
677                raise service_error(service_error.internal,
678                        "Cannot get status of segment %s:%s/%s" % \
679                                (tb, pid, eid))
680            else: return state
681
682
683        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
684            """
685            Start a sub-experiment on a federant.
686
687            Get the current state, modify or create as appropriate, ship data
688            and configs and start the experiment.  There are small ordering
689            differences based on the initial state of the sub-experiment.
690            """
691            # ops node in the federant
692            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
693            user = tbparams[tb]['user']     # federant user
694            pid = tbparams[tb]['project']   # federant project
695            # XXX
696            base_confs = ( "hosts",)
697            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
698            # Configuration directories on the remote machine
699            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
700            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
701            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
702
703            state = self.get_state(user, host, tb, pid, eid)
704
705            self.log.debug("[start_segment]: %s: %s" % (tb, state))
706            self.log.info("[start_segment]:transferring experiment to %s" % tb)
707
708            if not self.scp_file("%s/%s/%s" % \
709                    (tmpdir, tb, tclfile), user, host):
710                return False
711           
712            if state == 'none':
713                # Create a null copy of the experiment so that we capture any
714                # logs there if the modify fails.  Emulab software discards the
715                # logs from a failed startexp
716                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
717                    return False
718                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
719                timedout = False
720                try:
721                    if not self.ssh_cmd(user, host,
722                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
723                            "-e %s null.tcl") % (pid, eid), "startexp",
724                            timeout=60 * 10):
725                        return False
726                except self.ssh_cmd_timeout:
727                    timedout = True
728
729                if timedout:
730                    state = self.get_state(user, host, tb, pid, eid)
731                    if state != "swapped":
732                        return False
733
734           
735            # Open up a temporary file to contain a script for setting up the
736            # filespace for the new experiment.
737            self.log.info("[start_segment]: creating script file")
738            try:
739                sf, scriptname = tempfile.mkstemp()
740                scriptfile = os.fdopen(sf, 'w')
741            except IOError:
742                return False
743
744            scriptbase = os.path.basename(scriptname)
745
746            # Script the filesystem changes
747            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
748            # Clear and create the tarfiles and rpm directories
749            for d in (tarfiles_dir, rpms_dir):
750                print >>scriptfile, "/bin/rm -rf %s/*" % d
751                print >>scriptfile, "mkdir -p %s" % d
752            print >>scriptfile, 'mkdir -p %s' % proj_dir
753            self.create_config_tree("%s/%s" % (tmpdir, tb),
754                    proj_dir, scriptfile)
755            if os.path.isdir("%s/tarfiles" % tmpdir):
756                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
757                        scriptfile)
758            if os.path.isdir("%s/rpms" % tmpdir):
759                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
760                        scriptfile)
761            print >>scriptfile, "rm -f %s" % scriptbase
762            scriptfile.close()
763
764            # Move the script to the remote machine
765            # XXX: could collide tempfile names on the remote host
766            if self.scp_file(scriptname, user, host, scriptbase):
767                os.remove(scriptname)
768            else:
769                return False
770
771            # Execute the script (and the script's last line deletes it)
772            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
773                return False
774
775            for f in base_confs:
776                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
777                        "%s/%s" % (proj_dir, f)):
778                    return False
779            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
780                    proj_dir):
781                return False
782            if os.path.isdir("%s/tarfiles" % tmpdir):
783                if not self.ship_configs(host, user,
784                        "%s/tarfiles" % tmpdir, tarfiles_dir):
785                    return False
786            if os.path.isdir("%s/rpms" % tmpdir):
787                if not self.ship_configs(host, user,
788                        "%s/rpms" % tmpdir, tarfiles_dir):
789                    return False
790            # Stage the new configuration (active experiments will stay swapped
791            # in now)
792            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
793            try:
794                if not self.ssh_cmd(user, host,
795                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
796                                (pid, eid, tclfile),
797                        "modexp", timeout= 60 * 10):
798                    return False
799            except self.ssh_cmd_timeout:
800                self.log.error("Modify command failed to complete in time")
801                # There's really no way to see if this succeeded or failed, so
802                # if it hangs, assume the worst.
803                return False
804            # Active experiments are still swapped, this swaps the others in.
805            if state != 'active':
806                self.log.info("[start_segment]: Swapping %s in on %s" % \
807                        (eid, tb))
808                timedout = False
809                try:
810                    if not self.ssh_cmd(user, host,
811                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
812                            "swapexp", timeout=10*60):
813                        return False
814                except self.ssh_cmd_timeout:
815                    timedout = True
816               
817                # If the command was terminated, but completed successfully,
818                # report success.
819                if timedout:
820                    self.log.debug("[start_segment]: swapin timed out " +\
821                            "checking state")
822                    state = self.get_state(user, host, tb, pid, eid)
823                    self.log.debug("[start_segment]: state is %s" % state)
824                    return state == 'active'
825            # Everything has gone OK.
826            return True
827
828    class stop_segment(emulab_segment):
829        def __init__(self, log=None, keyfile=None, debug=False):
830            experiment_control_local.emulab_segment.__init__(self,
831                    log=log, keyfile=keyfile, debug=debug)
832
833        def __call__(self, tb, eid, tbparams):
834            """
835            Stop a sub experiment by calling swapexp on the federant
836            """
837            user = tbparams[tb]['user']
838            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
839            pid = tbparams[tb]['project']
840
841            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
842            rv = False
843            try:
844                # Clean out tar files: we've gone over quota in the past
845                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
846                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
847                        (pid, eid))
848                rv = self.ssh_cmd(user, host,
849                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
850            except self.ssh_cmd_timeout:
851                rv = False
852            return rv
853
854       
855    def generate_ssh_keys(self, dest, type="rsa" ):
856        """
857        Generate a set of keys for the gateways to use to talk.
858
859        Keys are of type type and are stored in the required dest file.
860        """
861        valid_types = ("rsa", "dsa")
862        t = type.lower();
863        if t not in valid_types: raise ValueError
864        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
865
866        try:
867            trace = open("/dev/null", "w")
868        except IOError:
869            raise service_error(service_error.internal,
870                    "Cannot open /dev/null??");
871
872        # May raise CalledProcessError
873        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
874        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
875        if rv != 0:
876            raise service_error(service_error.internal, 
877                    "Cannot generate nonce ssh keys.  %s return code %d" \
878                            % (self.ssh_keygen, rv))
879
880    def gentopo(self, str):
881        """
882        Generate the topology dtat structure from the splitter's XML
883        representation of it.
884
885        The topology XML looks like:
886            <experiment>
887                <nodes>
888                    <node><vname></vname><ips>ip1:ip2</ips></node>
889                </nodes>
890                <lans>
891                    <lan>
892                        <vname></vname><vnode></vnode><ip></ip>
893                        <bandwidth></bandwidth><member>node:port</member>
894                    </lan>
895                </lans>
896        """
897        class topo_parse:
898            """
899            Parse the topology XML and create the dats structure.
900            """
901            def __init__(self):
902                # Typing of the subelements for data conversion
903                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
904                self.int_subelements = ( 'bandwidth',)
905                self.float_subelements = ( 'delay',)
906                # The final data structure
907                self.nodes = [ ]
908                self.lans =  [ ]
909                self.topo = { \
910                        'node': self.nodes,\
911                        'lan' : self.lans,\
912                    }
913                self.element = { }  # Current element being created
914                self.chars = ""     # Last text seen
915
916            def end_element(self, name):
917                # After each sub element the contents is added to the current
918                # element or to the appropriate list.
919                if name == 'node':
920                    self.nodes.append(self.element)
921                    self.element = { }
922                elif name == 'lan':
923                    self.lans.append(self.element)
924                    self.element = { }
925                elif name in self.str_subelements:
926                    self.element[name] = self.chars
927                    self.chars = ""
928                elif name in self.int_subelements:
929                    self.element[name] = int(self.chars)
930                    self.chars = ""
931                elif name in self.float_subelements:
932                    self.element[name] = float(self.chars)
933                    self.chars = ""
934
935            def found_chars(self, data):
936                self.chars += data.rstrip()
937
938
939        tp = topo_parse();
940        parser = xml.parsers.expat.ParserCreate()
941        parser.EndElementHandler = tp.end_element
942        parser.CharacterDataHandler = tp.found_chars
943
944        parser.Parse(str)
945
946        return tp.topo
947       
948
949    def genviz(self, topo):
950        """
951        Generate the visualization the virtual topology
952        """
953
954        neato = "/usr/local/bin/neato"
955        # These are used to parse neato output and to create the visualization
956        # file.
957        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
958        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
959                "%s</type></node>"
960
961        try:
962            # Node names
963            nodes = [ n['vname'] for n in topo['node'] ]
964            topo_lans = topo['lan']
965        except KeyError:
966            raise service_error(service_error.internal, "Bad topology")
967
968        lans = { }
969        links = { }
970
971        # Walk through the virtual topology, organizing the connections into
972        # 2-node connections (links) and more-than-2-node connections (lans).
973        # When a lan is created, it's added to the list of nodes (there's a
974        # node in the visualization for the lan).
975        for l in topo_lans:
976            if links.has_key(l['vname']):
977                if len(links[l['vname']]) < 2:
978                    links[l['vname']].append(l['vnode'])
979                else:
980                    nodes.append(l['vname'])
981                    lans[l['vname']] = links[l['vname']]
982                    del links[l['vname']]
983                    lans[l['vname']].append(l['vnode'])
984            elif lans.has_key(l['vname']):
985                lans[l['vname']].append(l['vnode'])
986            else:
987                links[l['vname']] = [ l['vnode'] ]
988
989
990        # Open up a temporary file for dot to turn into a visualization
991        try:
992            df, dotname = tempfile.mkstemp()
993            dotfile = os.fdopen(df, 'w')
994        except IOError:
995            raise service_error(service_error.internal,
996                    "Failed to open file in genviz")
997
998        # Generate a dot/neato input file from the links, nodes and lans
999        try:
1000            print >>dotfile, "graph G {"
1001            for n in nodes:
1002                print >>dotfile, '\t"%s"' % n
1003            for l in links.keys():
1004                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
1005            for l in lans.keys():
1006                for n in lans[l]:
1007                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
1008            print >>dotfile, "}"
1009            dotfile.close()
1010        except TypeError:
1011            raise service_error(service_error.internal,
1012                    "Single endpoint link in vtopo")
1013        except IOError:
1014            raise service_error(service_error.internal, "Cannot write dot file")
1015
1016        # Use dot to create a visualization
1017        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1018                '-Gpack=true', dotname], stdout=PIPE, close_fds=True)
1019
1020        # Translate dot to vis format
1021        vis_nodes = [ ]
1022        vis = { 'node': vis_nodes }
1023        for line in dot.stdout:
1024            m = vis_re.match(line)
1025            if m:
1026                vn = m.group(1)
1027                vis_node = {'name': vn, \
1028                        'x': float(m.group(2)),\
1029                        'y' : float(m.group(3)),\
1030                    }
1031                if vn in links.keys() or vn in lans.keys():
1032                    vis_node['type'] = 'lan'
1033                else:
1034                    vis_node['type'] = 'node'
1035                vis_nodes.append(vis_node)
1036        rv = dot.wait()
1037
1038        os.remove(dotname)
1039        if rv == 0 : return vis
1040        else: return None
1041
1042    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1043            access_user):
1044        """
1045        Get access to testbed through fedd and set the parameters for that tb
1046        """
1047        uri = self.tbmap.get(tb, None)
1048        if not uri:
1049            raise service_error(serice_error.server_config, 
1050                    "Unknown testbed: %s" % tb)
1051
1052        # currently this lumps all users into one service access group
1053        service_keys = [ a for u in user \
1054                for a in u.get('access', []) \
1055                    if a.has_key('sshPubkey')]
1056
1057        if len(service_keys) == 0:
1058            raise service_error(service_error.req, 
1059                    "Must have at least one SSH pubkey for services")
1060
1061
1062        for p, u in access_user:
1063            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1064                    "to %s") %  ((p or "None"), u, uri))
1065
1066            if p:
1067                # Request with user and project specified
1068                req = {\
1069                        'destinationTestbed' : { 'uri' : uri },
1070                        'project': { 
1071                            'name': {'localname': p},
1072                            'user': [ {'userID': { 'localname': u } } ],
1073                            },
1074                        'user':  user,
1075                        'allocID' : { 'localname': 'test' },
1076                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1077                        'serviceAccess' : service_keys
1078                    }
1079            else:
1080                # Request with only user specified
1081                req = {\
1082                        'destinationTestbed' : { 'uri' : uri },
1083                        'user':  [ {'userID': { 'localname': u } } ],
1084                        'allocID' : { 'localname': 'test' },
1085                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1086                        'serviceAccess' : service_keys
1087                    }
1088
1089            if tb == master:
1090                # NB, the export_project parameter is a dict that includes
1091                # the type
1092                req['exportProject'] = export_project
1093
1094            # node resources if any
1095            if nodes != None and len(nodes) > 0:
1096                rnodes = [ ]
1097                for n in nodes:
1098                    rn = { }
1099                    image, hw, count = n.split(":")
1100                    if image: rn['image'] = [ image ]
1101                    if hw: rn['hardware'] = [ hw ]
1102                    if count and int(count) >0 : rn['count'] = int(count)
1103                    rnodes.append(rn)
1104                req['resources']= { }
1105                req['resources']['node'] = rnodes
1106
1107            try:
1108                if self.local_access.has_key(uri):
1109                    # Local access call
1110                    req = { 'RequestAccessRequestBody' : req }
1111                    r = self.local_access[uri].RequestAccess(req, 
1112                            fedid(file=self.cert_file))
1113                    r = { 'RequestAccessResponseBody' : r }
1114                else:
1115                    r = self.call_RequestAccess(uri, req, 
1116                            self.cert_file, self.cert_pwd, self.trusted_certs)
1117            except service_error, e:
1118                if e.code == service_error.access:
1119                    self.log.debug("[get_access] Access denied")
1120                    r = None
1121                    continue
1122                else:
1123                    raise e
1124
1125            if r.has_key('RequestAccessResponseBody'):
1126                # Through to here we have a valid response, not a fault.
1127                # Access denied is a fault, so something better or worse than
1128                # access denied has happened.
1129                r = r['RequestAccessResponseBody']
1130                self.log.debug("[get_access] Access granted")
1131                break
1132            else:
1133                raise service_error(service_error.protocol,
1134                        "Bad proxy response")
1135       
1136        if not r:
1137            raise service_error(service_error.access, 
1138                    "Access denied by %s (%s)" % (tb, uri))
1139
1140        e = r['emulab']
1141        p = e['project']
1142        tbparam[tb] = { 
1143                "boss": e['boss'],
1144                "host": e['ops'],
1145                "domain": e['domain'],
1146                "fs": e['fileServer'],
1147                "eventserver": e['eventServer'],
1148                "project": unpack_id(p['name']),
1149                "emulab" : e,
1150                "allocID" : r['allocID'],
1151                }
1152        # Make the testbed name be the label the user applied
1153        p['testbed'] = {'localname': tb }
1154
1155        for u in p['user']:
1156            role = u.get('role', None)
1157            if role == 'experimentCreation':
1158                tbparam[tb]['user'] = unpack_id(u['userID'])
1159                break
1160        else:
1161            raise service_error(service_error.internal, 
1162                    "No createExperimentUser from %s" %tb)
1163
1164        # Add attributes to barameter space.  We don't allow attributes to
1165        # overlay any parameters already installed.
1166        for a in e['fedAttr']:
1167            try:
1168                if a['attribute'] and isinstance(a['attribute'], basestring)\
1169                        and not tbparam[tb].has_key(a['attribute'].lower()):
1170                    tbparam[tb][a['attribute'].lower()] = a['value']
1171            except KeyError:
1172                self.log.error("Bad attribute in response: %s" % a)
1173       
1174    def release_access(self, tb, aid):
1175        """
1176        Release access to testbed through fedd
1177        """
1178
1179        uri = self.tbmap.get(tb, None)
1180        if not uri:
1181            raise service_error(serice_error.server_config, 
1182                    "Unknown testbed: %s" % tb)
1183
1184        if self.local_access.has_key(uri):
1185            resp = self.local_access[uri].ReleaseAccess(\
1186                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1187                    fedid(file=self.cert_file))
1188            resp = { 'ReleaseAccessResponseBody': resp } 
1189        else:
1190            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1191                    self.cert_file, self.cert_pwd, self.trusted_certs)
1192
1193        # better error coding
1194
1195    def remote_splitter(self, uri, desc, master):
1196
1197        req = {
1198                'description' : { 'ns2description': desc },
1199                'master': master,
1200                'include_fedkit': bool(self.fedkit),
1201                'include_gatewaykit': bool(self.gatewaykit)
1202            }
1203
1204        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1205                self.trusted_certs)
1206
1207        if r.has_key('Ns2SplitResponseBody'):
1208            r = r['Ns2SplitResponseBody']
1209            if r.has_key('output'):
1210                return r['output'].splitlines()
1211            else:
1212                raise service_error(service_error.protocol, 
1213                        "Bad splitter response (no output)")
1214        else:
1215            raise service_error(service_error.protocol, "Bad splitter response")
1216       
1217    class current_testbed:
1218        """
1219        Object for collecting the current testbed description.  The testbed
1220        description is saved to a file with the local testbed variables
1221        subsittuted line by line.
1222        """
1223        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1224            def tar_list_to_string(tl):
1225                if tl is None: return None
1226
1227                rv = ""
1228                for t in tl:
1229                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1230                            (t[0], os.path.basename(t[1]))
1231                return rv
1232
1233
1234            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1235            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1236            self.current_testbed = None
1237            self.testbed_file = None
1238
1239            self.def_expstart = \
1240                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1241            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1242            self.def_gwstart = \
1243                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1244            self.def_mgwstart = \
1245                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1246            self.def_gwimage = "FBSD61-TUNNEL2";
1247            self.def_gwtype = "pc";
1248            self.def_mgwcmd = '# '
1249            self.def_mgwcmdparams = ''
1250            self.def_gwcmd = '# '
1251            self.def_gwcmdparams = ''
1252
1253            self.eid = eid
1254            self.tmpdir = tmpdir
1255            # Convert fedkit and gateway kit (which are lists of tuples) into a
1256            # substituition string.
1257            self.fedkit = tar_list_to_string(fedkit)
1258            self.gatewaykit = tar_list_to_string(gatewaykit)
1259
1260        def __call__(self, line, master, allocated, tbparams):
1261            # Capture testbed topology descriptions
1262            if self.current_testbed == None:
1263                m = self.begin_testbed.match(line)
1264                if m != None:
1265                    self.current_testbed = m.group(1)
1266                    if self.current_testbed == None:
1267                        raise service_error(service_error.req,
1268                                "Bad request format (unnamed testbed)")
1269                    allocated[self.current_testbed] = \
1270                            allocated.get(self.current_testbed,0) + 1
1271                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1272                    if not os.path.exists(tb_dir):
1273                        try:
1274                            os.mkdir(tb_dir)
1275                        except IOError:
1276                            raise service_error(service_error.internal,
1277                                    "Cannot create %s" % tb_dir)
1278                    try:
1279                        self.testbed_file = open("%s/%s.%s.tcl" %
1280                                (tb_dir, self.eid, self.current_testbed), 'w')
1281                    except IOError:
1282                        self.testbed_file = None
1283                    return True
1284                else: return False
1285            else:
1286                m = self.end_testbed.match(line)
1287                if m != None:
1288                    if m.group(1) != self.current_testbed:
1289                        raise service_error(service_error.internal, 
1290                                "Mismatched testbed markers!?")
1291                    if self.testbed_file != None: 
1292                        self.testbed_file.close()
1293                        self.testbed_file = None
1294                    self.current_testbed = None
1295                elif self.testbed_file:
1296                    # Substitute variables and put the line into the local
1297                    # testbed file.
1298                    gwtype = tbparams[self.current_testbed].get(\
1299                            'connectortype', self.def_gwtype)
1300                    gwimage = tbparams[self.current_testbed].get(\
1301                            'connectorimage', self.def_gwimage)
1302                    mgwstart = tbparams[self.current_testbed].get(\
1303                            'masterconnectorstartcmd', self.def_mgwstart)
1304                    mexpstart = tbparams[self.current_testbed].get(\
1305                            'masternodestartcmd', self.def_mexpstart)
1306                    gwstart = tbparams[self.current_testbed].get(\
1307                            'slaveconnectorstartcmd', self.def_gwstart)
1308                    expstart = tbparams[self.current_testbed].get(\
1309                            'slavenodestartcmd', self.def_expstart)
1310                    project = tbparams[self.current_testbed].get('project')
1311                    gwcmd = tbparams[self.current_testbed].get(\
1312                            'slaveconnectorcmd', self.def_gwcmd)
1313                    gwcmdparams = tbparams[self.current_testbed].get(\
1314                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1315                    mgwcmd = tbparams[self.current_testbed].get(\
1316                            'masterconnectorcmd', self.def_gwcmd)
1317                    mgwcmdparams = tbparams[self.current_testbed].get(\
1318                            'masterconnectorcmdparams', self.def_gwcmdparams)
1319                    line = re.sub("GWTYPE", gwtype, line)
1320                    line = re.sub("GWIMAGE", gwimage, line)
1321                    if self.current_testbed == master:
1322                        line = re.sub("GWSTART", mgwstart, line)
1323                        line = re.sub("EXPSTART", mexpstart, line)
1324                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1325                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1326                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1327                    else:
1328                        line = re.sub("GWSTART", gwstart, line)
1329                        line = re.sub("EXPSTART", expstart, line)
1330                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1331                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1332                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1333                    #These expansions contain EID and PROJDIR.  NB these are
1334                    # local fedkit and gatewaykit, which are strings.
1335                    if self.fedkit:
1336                        line = re.sub("FEDKIT", self.fedkit, line)
1337                    if self.gatewaykit:
1338                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1339                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1340                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1341                    line = re.sub("EID", self.eid, line)
1342                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1343                            (project, self.eid), line)
1344                    print >>self.testbed_file, line
1345                return True
1346
1347    class allbeds:
1348        """
1349        Process the Allbeds section.  Get access to each federant and save the
1350        parameters in tbparams
1351        """
1352        def __init__(self, get_access):
1353            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1354            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1355            self.in_allbeds = False
1356            self.get_access = get_access
1357
1358        def __call__(self, line, user, tbparams, master, export_project,
1359                access_user):
1360            # Testbed access parameters
1361            if not self.in_allbeds:
1362                if self.begin_allbeds.match(line):
1363                    self.in_allbeds = True
1364                    return True
1365                else:
1366                    return False
1367            else:
1368                if self.end_allbeds.match(line):
1369                    self.in_allbeds = False
1370                else:
1371                    nodes = line.split('|')
1372                    tb = nodes.pop(0)
1373                    self.get_access(tb, nodes, user, tbparams, master,
1374                            export_project, access_user)
1375                return True
1376
1377    class gateways:
1378        def __init__(self, eid, master, tmpdir, gw_pubkey,
1379                gw_secretkey, copy_file, fedkit):
1380            self.begin_gateways = \
1381                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1382            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1383            self.current_gateways = None
1384            self.control_gateway = None
1385            self.active_end = { }
1386
1387            self.eid = eid
1388            self.master = master
1389            self.tmpdir = tmpdir
1390            self.gw_pubkey_base = gw_pubkey
1391            self.gw_secretkey_base = gw_secretkey
1392
1393            self.copy_file = copy_file
1394            self.fedkit = fedkit
1395
1396
1397        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1398                active_end, tbparams, dtb, myname, desthost, type):
1399            """
1400            Produce a gateway configuration file from a gateways line.
1401            """
1402
1403            sproject = tbparams[gw].get('project', 'project')
1404            dproject = tbparams[dtb].get('project', 'project')
1405            sdomain = ".%s.%s%s" % (eid, sproject,
1406                    tbparams[gw].get('domain', ".example.com"))
1407            ddomain = ".%s.%s%s" % (eid, dproject,
1408                    tbparams[dtb].get('domain', ".example.com"))
1409            boss = tbparams[master].get('boss', "boss")
1410            fs = tbparams[master].get('fs', "fs")
1411            event_server = "%s%s" % \
1412                    (tbparams[gw].get('eventserver', "event_server"),
1413                            tbparams[gw].get('domain', "example.com"))
1414            remote_event_server = "%s%s" % \
1415                    (tbparams[dtb].get('eventserver', "event_server"),
1416                            tbparams[dtb].get('domain', "example.com"))
1417            seer_control = "%s%s" % \
1418                    (tbparams[gw].get('control', "control"), sdomain)
1419            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1420
1421            if self.fedkit:
1422                remote_script_dir = "/usr/local/federation/bin"
1423                local_script_dir = "/usr/local/federation/bin"
1424            else:
1425                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1426                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1427
1428            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1429            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1430            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1431
1432            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1433            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1434
1435            # translate to lower case so the `hostname` hack for specifying
1436            # configuration files works.
1437            conf_file = conf_file.lower();
1438            remote_conf_file = remote_conf_file.lower();
1439
1440            if dtb == master:
1441                active = "false"
1442            elif gw == master:
1443                active = "true"
1444            elif active_end.has_key('%s-%s' % (dtb, gw)):
1445                active = "false"
1446            else:
1447                active_end['%s-%s' % (gw, dtb)] = 1
1448                active = "true"
1449
1450            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1451            print >>gwconfig, "Active: %s" % active
1452            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1453            if tunnel_iface:
1454                print >>gwconfig, "Interface: %s" % tunnel_iface
1455            print >>gwconfig, "BossName: %s" % boss
1456            print >>gwconfig, "FsName: %s" % fs
1457            print >>gwconfig, "EventServerName: %s" % event_server
1458            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1459            print >>gwconfig, "SeerControl: %s" % seer_control
1460            print >>gwconfig, "Type: %s" % type
1461            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1462            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1463                    local_script_dir
1464            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1465            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1466            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1467                    (remote_conf_dir, remote_conf_file)
1468            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1469            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1470            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1471            gwconfig.close()
1472
1473            return active == "true"
1474
1475        def __call__(self, line, allocated, tbparams):
1476            # Process gateways
1477            if not self.current_gateways:
1478                m = self.begin_gateways.match(line)
1479                if m:
1480                    self.current_gateways = m.group(1)
1481                    if allocated.has_key(self.current_gateways):
1482                        # This test should always succeed
1483                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1484                        if not os.path.exists(tb_dir):
1485                            try:
1486                                os.mkdir(tb_dir)
1487                            except IOError:
1488                                raise service_error(service_error.internal,
1489                                        "Cannot create %s" % tb_dir)
1490                    else:
1491                        # XXX
1492                        self.log.error("[gateways]: Ignoring gateways for " + \
1493                                "unknown testbed %s" % self.current_gateways)
1494                        self.current_gateways = None
1495                    return True
1496                else:
1497                    return False
1498            else:
1499                m = self.end_gateways.match(line)
1500                if m :
1501                    if m.group(1) != self.current_gateways:
1502                        raise service_error(service_error.internal,
1503                                "Mismatched gateway markers!?")
1504                    if self.control_gateway:
1505                        try:
1506                            cc = open("%s/%s/client.conf" %
1507                                    (self.tmpdir, self.current_gateways), 'w')
1508                            print >>cc, "ControlGateway: %s" % \
1509                                    self.control_gateway
1510                            if tbparams[self.master].has_key('smbshare'):
1511                                print >>cc, "SMBSHare: %s" % \
1512                                        tbparams[self.master]['smbshare']
1513                            print >>cc, "ProjectUser: %s" % \
1514                                    tbparams[self.master]['user']
1515                            print >>cc, "ProjectName: %s" % \
1516                                    tbparams[self.master]['project']
1517                            print >>cc, "ExperimentID: %s/%s" % \
1518                                    ( tbparams[self.master]['project'], \
1519                                    self.eid )
1520                            cc.close()
1521                        except IOError:
1522                            raise service_error(service_error.internal,
1523                                    "Error creating client config")
1524                        # XXX: This seer specific file should disappear
1525                        try:
1526                            cc = open("%s/%s/seer.conf" %
1527                                    (self.tmpdir, self.current_gateways),
1528                                    'w')
1529                            if self.current_gateways != self.master:
1530                                print >>cc, "ControlNode: %s" % \
1531                                        self.control_gateway
1532                            print >>cc, "ExperimentID: %s/%s" % \
1533                                    ( tbparams[self.master]['project'], \
1534                                    self.eid )
1535                            cc.close()
1536                        except IOError:
1537                            raise service_error(service_error.internal,
1538                                    "Error creating seer config")
1539                    else:
1540                        debug.error("[gateways]: No control gateway for %s" %\
1541                                    self.current_gateways)
1542                    self.current_gateways = None
1543                else:
1544                    dtb, myname, desthost, type = line.split(" ")
1545
1546                    if type == "control" or type == "both":
1547                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1548                                self.eid, 
1549                                tbparams[self.current_gateways]['project'],
1550                                tbparams[self.current_gateways]['domain'])
1551                    try:
1552                        active = self.gateway_conf_file(self.current_gateways,
1553                                self.master, self.eid, self.gw_pubkey_base,
1554                                self.gw_secretkey_base,
1555                                self.active_end, tbparams, dtb, myname,
1556                                desthost, type)
1557                    except IOError, e:
1558                        raise service_error(service_error.internal,
1559                                "Failed to write config file for %s" % \
1560                                        self.current_gateway)
1561           
1562                    gw_pubkey = "%s/keys/%s" % \
1563                            (self.tmpdir, self.gw_pubkey_base)
1564                    gw_secretkey = "%s/keys/%s" % \
1565                            (self.tmpdir, self.gw_secretkey_base)
1566
1567                    pkfile = "%s/%s/%s" % \
1568                            ( self.tmpdir, self.current_gateways, 
1569                                    self.gw_pubkey_base)
1570                    skfile = "%s/%s/%s" % \
1571                            ( self.tmpdir, self.current_gateways, 
1572                                    self.gw_secretkey_base)
1573
1574                    if not os.path.exists(pkfile):
1575                        try:
1576                            self.copy_file(gw_pubkey, pkfile)
1577                        except IOError:
1578                            service_error(service_error.internal,
1579                                    "Failed to copy pubkey file")
1580
1581                    if active and not os.path.exists(skfile):
1582                        try:
1583                            self.copy_file(gw_secretkey, skfile)
1584                        except IOError:
1585                            service_error(service_error.internal,
1586                                    "Failed to copy secretkey file")
1587                return True
1588
1589    class shunt_to_file:
1590        """
1591        Simple class to write data between two regexps to a file.
1592        """
1593        def __init__(self, begin, end, filename):
1594            """
1595            Begin shunting on a match of begin, stop on end, send data to
1596            filename.
1597            """
1598            self.begin = re.compile(begin)
1599            self.end = re.compile(end)
1600            self.in_shunt = False
1601            self.file = None
1602            self.filename = filename
1603
1604        def __call__(self, line):
1605            """
1606            Call this on each line in the input that may be shunted.
1607            """
1608            if not self.in_shunt:
1609                if self.begin.match(line):
1610                    self.in_shunt = True
1611                    try:
1612                        self.file = open(self.filename, "w")
1613                    except:
1614                        self.file = None
1615                        raise
1616                    return True
1617                else:
1618                    return False
1619            else:
1620                if self.end.match(line):
1621                    if self.file: 
1622                        self.file.close()
1623                        self.file = None
1624                    self.in_shunt = False
1625                else:
1626                    if self.file:
1627                        print >>self.file, line
1628                return True
1629
1630    class shunt_to_list:
1631        """
1632        Same interface as shunt_to_file.  Data collected in self.list, one list
1633        element per line.
1634        """
1635        def __init__(self, begin, end):
1636            self.begin = re.compile(begin)
1637            self.end = re.compile(end)
1638            self.in_shunt = False
1639            self.list = [ ]
1640       
1641        def __call__(self, line):
1642            if not self.in_shunt:
1643                if self.begin.match(line):
1644                    self.in_shunt = True
1645                    return True
1646                else:
1647                    return False
1648            else:
1649                if self.end.match(line):
1650                    self.in_shunt = False
1651                else:
1652                    self.list.append(line)
1653                return True
1654
1655    class shunt_to_string:
1656        """
1657        Same interface as shunt_to_file.  Data collected in self.str, all in
1658        one string.
1659        """
1660        def __init__(self, begin, end):
1661            self.begin = re.compile(begin)
1662            self.end = re.compile(end)
1663            self.in_shunt = False
1664            self.str = ""
1665       
1666        def __call__(self, line):
1667            if not self.in_shunt:
1668                if self.begin.match(line):
1669                    self.in_shunt = True
1670                    return True
1671                else:
1672                    return False
1673            else:
1674                if self.end.match(line):
1675                    self.in_shunt = False
1676                else:
1677                    self.str += line
1678                return True
1679
1680    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1681            tbparams, tmpdir, alloc_log=None):
1682        started = { }           # Testbeds where a sub-experiment started
1683                                # successfully
1684
1685        # XXX
1686        fail_soft = False
1687
1688        log = alloc_log or self.log
1689
1690        thread_pool = self.thread_pool(self.nthreads)
1691        threads = [ ]
1692
1693        for tb in [ k for k in allocated.keys() if k != master]:
1694            # Create and start a thread to start the segment, and save it to
1695            # get the return value later
1696            thread_pool.wait_for_slot()
1697            t  = self.pooled_thread(\
1698                    target=self.start_segment(log=log,
1699                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1700                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1701                    pdata=thread_pool, trace_file=self.trace_file)
1702            threads.append(t)
1703            t.start()
1704
1705        # Wait until all finish
1706        thread_pool.wait_for_all_done()
1707
1708        # If none failed, start the master
1709        failed = [ t.getName() for t in threads if not t.rv ]
1710
1711        if len(failed) == 0:
1712            starter = self.start_segment(log=log, 
1713                    keyfile=self.ssh_privkey_file, debug=self.debug)
1714            if not starter(master, eid, tbparams, tmpdir):
1715                failed.append(master)
1716
1717        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1718        # If one failed clean up, unless fail_soft is set
1719        if failed:
1720            if not fail_soft:
1721                thread_pool.clear()
1722                for tb in succeeded:
1723                    # Create and start a thread to stop the segment
1724                    thread_pool.wait_for_slot()
1725                    t  = self.pooled_thread(\
1726                            target=self.stop_segment(log=log,
1727                                keyfile=self.ssh_privkey_file,
1728                                debug=self.debug), 
1729                            args=(tb, eid, tbparams), name=tb,
1730                            pdata=thread_pool, trace_file=self.trace_file)
1731                    t.start()
1732                # Wait until all finish
1733                thread_pool.wait_for_all_done()
1734
1735                # release the allocations
1736                for tb in tbparams.keys():
1737                    self.release_access(tb, tbparams[tb]['allocID'])
1738                # Remove the placeholder
1739                self.state_lock.acquire()
1740                self.state[eid]['experimentStatus'] = 'failed'
1741                if self.state_filename: self.write_state()
1742                self.state_lock.release()
1743
1744                #raise service_error(service_error.federant,
1745                #    "Swap in failed on %s" % ",".join(failed))
1746                log.error("Swap in failed on %s" % ",".join(failed))
1747                return
1748        else:
1749            log.info("[start_segment]: Experiment %s active" % eid)
1750
1751        log.debug("[start_experiment]: removing %s" % tmpdir)
1752
1753        # Walk up tmpdir, deleting as we go
1754        for path, dirs, files in os.walk(tmpdir, topdown=False):
1755            for f in files:
1756                os.remove(os.path.join(path, f))
1757            for d in dirs:
1758                os.rmdir(os.path.join(path, d))
1759        os.rmdir(tmpdir)
1760
1761        # Insert the experiment into our state and update the disk copy
1762        self.state_lock.acquire()
1763        self.state[expid]['experimentStatus'] = 'active'
1764        self.state[eid] = self.state[expid]
1765        if self.state_filename: self.write_state()
1766        self.state_lock.release()
1767        return
1768
1769    def create_experiment(self, req, fid):
1770        """
1771        The external interface to experiment creation called from the
1772        dispatcher.
1773
1774        Creates a working directory, splits the incoming description using the
1775        splitter script and parses out the avrious subsections using the
1776        lcasses above.  Once each sub-experiment is created, use pooled threads
1777        to instantiate them and start it all up.
1778        """
1779
1780        if not self.auth.check_attribute(fid, 'create'):
1781            raise service_error(service_error.access, "Create access denied")
1782
1783        try:
1784            tmpdir = tempfile.mkdtemp(prefix="split-")
1785        except IOError:
1786            raise service_error(service_error.internal, "Cannot create tmp dir")
1787
1788        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1789        gw_secretkey_base = "fed.%s" % self.ssh_type
1790        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1791        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1792        tclfile = tmpdir + "/experiment.tcl"
1793        tbparams = { }
1794        try:
1795            access_user = self.accessdb[fid]
1796        except KeyError:
1797            raise service_error(service_error.internal,
1798                    "Access map and authorizer out of sync in " + \
1799                            "create_experiment for fedid %s"  % fid)
1800
1801        pid = "dummy"
1802        gid = "dummy"
1803        try:
1804            os.mkdir(tmpdir+"/keys")
1805        except OSError:
1806            raise service_error(service_error.internal,
1807                    "Can't make temporary dir")
1808
1809        req = req.get('CreateRequestBody', None)
1810        if not req:
1811            raise service_error(service_error.req,
1812                    "Bad request format (no CreateRequestBody)")
1813        # The tcl parser needs to read a file so put the content into that file
1814        descr=req.get('experimentdescription', None)
1815        if descr:
1816            file_content=descr.get('ns2description', None)
1817            if file_content:
1818                try:
1819                    f = open(tclfile, 'w')
1820                    f.write(file_content)
1821                    f.close()
1822                except IOError:
1823                    raise service_error(service_error.internal,
1824                            "Cannot write temp experiment description")
1825            else:
1826                raise service_error(service_error.req, 
1827                        "Only ns2descriptions supported")
1828        else:
1829            raise service_error(service_error.req, "No experiment description")
1830
1831        # Generate an ID for the experiment (slice) and a certificate that the
1832        # allocator can use to prove they own it.  We'll ship it back through
1833        # the encrypted connection.
1834        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1835
1836        if req.has_key('experimentID') and \
1837                req['experimentID'].has_key('localname'):
1838            eid = req['experimentID']['localname']
1839            self.state_lock.acquire()
1840            while (self.state.has_key(eid)):
1841                eid += random.choice(string.ascii_letters)
1842            # Initial state
1843            self.state[eid] = {
1844                    'experimentID' : \
1845                            [ { 'localname' : eid }, {'fedid': expid } ],
1846                    'experimentStatus': 'starting',
1847                    'experimentAccess': { 'X509' : expcert },
1848                    'owner': fid,
1849                    'log' : [],
1850                }
1851            self.state[expid] = self.state[eid]
1852            if self.state_filename: self.write_state()
1853            self.state_lock.release()
1854        else:
1855            eid = self.exp_stem
1856            for i in range(0,5):
1857                eid += random.choice(string.ascii_letters)
1858            self.state_lock.acquire()
1859            while (self.state.has_key(eid)):
1860                eid = self.exp_stem
1861                for i in range(0,5):
1862                    eid += random.choice(string.ascii_letters)
1863            # Initial state
1864            self.state[eid] = {
1865                    'experimentID' : \
1866                            [ { 'localname' : eid }, {'fedid': expid } ],
1867                    'experimentStatus': 'starting',
1868                    'experimentAccess': { 'X509' : expcert },
1869                    'owner': fid,
1870                    'log' : [],
1871                }
1872            self.state[expid] = self.state[eid]
1873            if self.state_filename: self.write_state()
1874            self.state_lock.release()
1875
1876        try: 
1877            # This catches exceptions to clear the placeholder if necessary
1878            try:
1879                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1880            except ValueError:
1881                raise service_error(service_error.server_config, 
1882                        "Bad key type (%s)" % self.ssh_type)
1883
1884            user = req.get('user', None)
1885            if user == None:
1886                raise service_error(service_error.req, "No user")
1887
1888            master = req.get('master', None)
1889            if not master:
1890                raise service_error(service_error.req,
1891                        "No master testbed label")
1892            export_project = req.get('exportProject', None)
1893            if not export_project:
1894                raise service_error(service_error.req, "No export project")
1895           
1896            if self.splitter_url:
1897                self.log.debug("Calling remote splitter at %s" % \
1898                        self.splitter_url)
1899                split_data = self.remote_splitter(self.splitter_url,
1900                        file_content, master)
1901            else:
1902                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1903                    str(self.muxmax), '-m', master]
1904
1905                if self.fedkit:
1906                    tclcmd.append('-k')
1907
1908                if self.gatewaykit:
1909                    tclcmd.append('-K')
1910
1911                tclcmd.extend([pid, gid, eid, tclfile])
1912
1913                self.log.debug("running local splitter %s", " ".join(tclcmd))
1914                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True)
1915                split_data = tclparser.stdout
1916
1917            allocated = { }         # Testbeds we can access
1918            # Objects to parse the splitter output (defined above)
1919            parse_current_testbed = self.current_testbed(eid, tmpdir,
1920                    self.fedkit, self.gatewaykit)
1921            parse_allbeds = self.allbeds(self.get_access)
1922            parse_gateways = self.gateways(eid, master, tmpdir,
1923                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1924                    self.fedkit)
1925            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1926                        "^#\s+End\s+Vtopo")
1927            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1928                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1929            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1930                    "^#\s+End\s+tarfiles")
1931            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1932                    "^#\s+End\s+rpms")
1933
1934            # Working on the split data
1935            for line in split_data:
1936                line = line.rstrip()
1937                if parse_current_testbed(line, master, allocated, tbparams):
1938                    continue
1939                elif parse_allbeds(line, user, tbparams, master, export_project,
1940                        access_user):
1941                    continue
1942                elif parse_gateways(line, allocated, tbparams):
1943                    continue
1944                elif parse_vtopo(line):
1945                    continue
1946                elif parse_hostnames(line):
1947                    continue
1948                elif parse_tarfiles(line):
1949                    continue
1950                elif parse_rpms(line):
1951                    continue
1952                else:
1953                    raise service_error(service_error.internal, 
1954                            "Bad tcl parse? %s" % line)
1955            # Virtual topology and visualization
1956            vtopo = self.gentopo(parse_vtopo.str)
1957            if not vtopo:
1958                raise service_error(service_error.internal, 
1959                        "Failed to generate virtual topology")
1960
1961            vis = self.genviz(vtopo)
1962            if not vis:
1963                raise service_error(service_error.internal, 
1964                        "Failed to generate visualization")
1965
1966           
1967            # save federant information
1968            for k in allocated.keys():
1969                tbparams[k]['federant'] = {\
1970                        'name': [ { 'localname' : eid} ],\
1971                        'emulab': tbparams[k]['emulab'],\
1972                        'allocID' : tbparams[k]['allocID'],\
1973                        'master' : k == master,\
1974                    }
1975
1976            self.state_lock.acquire()
1977            self.state[eid]['vtopo'] = vtopo
1978            self.state[eid]['vis'] = vis
1979            self.state[expid]['federant'] = \
1980                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1981                        if tbparams[tb].has_key('federant') ]
1982            if self.state_filename: self.write_state()
1983            self.state_lock.release()
1984
1985            # Copy tarfiles and rpms needed at remote sites into a staging area
1986            try:
1987                if self.fedkit:
1988                    for t in self.fedkit:
1989                        parse_tarfiles.list.append(t[1])
1990                if self.gatewaykit:
1991                    for t in self.gatewaykit:
1992                        parse_tarfiles.list.append(t[1])
1993                for t in parse_tarfiles.list:
1994                    if not os.path.exists("%s/tarfiles" % tmpdir):
1995                        os.mkdir("%s/tarfiles" % tmpdir)
1996                    self.copy_file(t, "%s/tarfiles/%s" % \
1997                            (tmpdir, os.path.basename(t)))
1998                for r in parse_rpms.list:
1999                    if not os.path.exists("%s/rpms" % tmpdir):
2000                        os.mkdir("%s/rpms" % tmpdir)
2001                    self.copy_file(r, "%s/rpms/%s" % \
2002                            (tmpdir, os.path.basename(r)))
2003                # A null experiment file in case we need to create a remote
2004                # experiment from scratch
2005                f = open("%s/null.tcl" % tmpdir, "w")
2006                print >>f, """
2007set ns [new Simulator]
2008source tb_compat.tcl
2009
2010set a [$ns node]
2011
2012$ns rtproto Session
2013$ns run
2014"""
2015                f.close()
2016
2017            except IOError, e:
2018                raise service_error(service_error.internal, 
2019                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2020
2021        except service_error, e:
2022            # If something goes wrong in the parse (usually an access error)
2023            # clear the placeholder state.  From here on out the code delays
2024            # exceptions.  Failing at this point returns a fault to the remote
2025            # caller.
2026            self.state_lock.acquire()
2027            del self.state[eid]
2028            del self.state[expid]
2029            if self.state_filename: self.write_state()
2030            self.state_lock.release()
2031            raise e
2032
2033
2034        # Start the background swapper and return the starting state.  From
2035        # here on out, the state will stick around a while.
2036
2037        # Let users touch the state
2038        self.auth.set_attribute(fid, expid)
2039        self.auth.set_attribute(expid, expid)
2040        # Override fedids can manipulate state as well
2041        for o in self.overrides:
2042            self.auth.set_attribute(o, expid)
2043
2044        # Create a logger that logs to the experiment's state object as well as
2045        # to the main log file.
2046        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2047        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2048        # XXX: there should be a global one of these rather than repeating the
2049        # code.
2050        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2051                    '%d %b %y %H:%M:%S'))
2052        alloc_log.addHandler(h)
2053       
2054        # Start a thread to do the resource allocation
2055        t  = Thread(target=self.allocate_resources,
2056                args=(allocated, master, eid, expid, expcert, tbparams, 
2057                    tmpdir, alloc_log),
2058                name=eid)
2059        t.start()
2060
2061        rv = {
2062                'experimentID': [
2063                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2064                ],
2065                'experimentStatus': 'starting',
2066                'experimentAccess': { 'X509' : expcert }
2067            }
2068
2069        return rv
2070
2071    def check_experiment_access(self, fid, key):
2072        """
2073        Confirm that the fid has access to the experiment.  Though a request
2074        may be made in terms of a local name, the access attribute is always
2075        the experiment's fedid.
2076        """
2077        if not isinstance(key, fedid):
2078            self.state_lock.acquire()
2079            if self.state.has_key(key):
2080                if isinstance(self.state[key], dict):
2081                    try:
2082                        kl = [ f['fedid'] for f in \
2083                                self.state[key]['experimentID']\
2084                                    if f.has_key('fedid') ]
2085                    except KeyError:
2086                        self.state_lock.release()
2087                        raise service_error(service_error.internal, 
2088                                "No fedid for experiment %s when checking " +\
2089                                        "access(!?)" % key)
2090                    if len(kl) == 1:
2091                        key = kl[0]
2092                    else:
2093                        self.state_lock.release()
2094                        raise service_error(service_error.internal, 
2095                                "multiple fedids for experiment %s when " +\
2096                                        "checking access(!?)" % key)
2097                elif isinstance(self.state[key], str):
2098                    self.state_lock.release()
2099                    raise service_error(service_error.internal, 
2100                            ("experiment %s is placeholder.  " +\
2101                                    "Creation in progress or aborted oddly") \
2102                                    % key)
2103                else:
2104                    self.state_lock.release()
2105                    raise service_error(service_error.internal, 
2106                            "Unexpected state for %s" % key)
2107
2108            else:
2109                self.state_lock.release()
2110                raise service_error(service_error.access, "Access Denied")
2111            self.state_lock.release()
2112
2113        if self.auth.check_attribute(fid, key):
2114            return True
2115        else:
2116            raise service_error(service_error.access, "Access Denied")
2117
2118
2119
2120    def get_vtopo(self, req, fid):
2121        """
2122        Return the stored virtual topology for this experiment
2123        """
2124        rv = None
2125        state = None
2126
2127        req = req.get('VtopoRequestBody', None)
2128        if not req:
2129            raise service_error(service_error.req,
2130                    "Bad request format (no VtopoRequestBody)")
2131        exp = req.get('experiment', None)
2132        if exp:
2133            if exp.has_key('fedid'):
2134                key = exp['fedid']
2135                keytype = "fedid"
2136            elif exp.has_key('localname'):
2137                key = exp['localname']
2138                keytype = "localname"
2139            else:
2140                raise service_error(service_error.req, "Unknown lookup type")
2141        else:
2142            raise service_error(service_error.req, "No request?")
2143
2144        self.check_experiment_access(fid, key)
2145
2146        self.state_lock.acquire()
2147        if self.state.has_key(key):
2148            if self.state[key].has_key('vtopo'):
2149                rv = { 'experiment' : {keytype: key },\
2150                        'vtopo': self.state[key]['vtopo'],\
2151                    }
2152            else:
2153                state = self.state[key]['experimentStatus']
2154        self.state_lock.release()
2155
2156        if rv: return rv
2157        else: 
2158            if state:
2159                raise service_error(service_error.partial, 
2160                        "Not ready: %s" % state)
2161            else:
2162                raise service_error(service_error.req, "No such experiment")
2163
2164    def get_vis(self, req, fid):
2165        """
2166        Return the stored visualization for this experiment
2167        """
2168        rv = None
2169        state = None
2170
2171        req = req.get('VisRequestBody', None)
2172        if not req:
2173            raise service_error(service_error.req,
2174                    "Bad request format (no VisRequestBody)")
2175        exp = req.get('experiment', None)
2176        if exp:
2177            if exp.has_key('fedid'):
2178                key = exp['fedid']
2179                keytype = "fedid"
2180            elif exp.has_key('localname'):
2181                key = exp['localname']
2182                keytype = "localname"
2183            else:
2184                raise service_error(service_error.req, "Unknown lookup type")
2185        else:
2186            raise service_error(service_error.req, "No request?")
2187
2188        self.check_experiment_access(fid, key)
2189
2190        self.state_lock.acquire()
2191        if self.state.has_key(key):
2192            if self.state[key].has_key('vis'):
2193                rv =  { 'experiment' : {keytype: key },\
2194                        'vis': self.state[key]['vis'],\
2195                        }
2196            else:
2197                state = self.state[key]['experimentStatus']
2198        self.state_lock.release()
2199
2200        if rv: return rv
2201        else:
2202            if state:
2203                raise service_error(service_error.partial, 
2204                        "Not ready: %s" % state)
2205            else:
2206                raise service_error(service_error.req, "No such experiment")
2207
2208    def clean_info_response(self, rv):
2209        """
2210        Remove the information in the experiment's state object that is not in
2211        the info response.
2212        """
2213        # Remove the owner info (should always be there, but...)
2214        if rv.has_key('owner'): del rv['owner']
2215
2216        # Convert the log into the allocationLog parameter and remove the
2217        # log entry (with defensive programming)
2218        if rv.has_key('log'):
2219            rv['allocationLog'] = "".join(rv['log'])
2220            del rv['log']
2221        else:
2222            rv['allocationLog'] = ""
2223
2224        if rv['experimentStatus'] != 'active':
2225            if rv.has_key('federant'): del rv['federant']
2226        else:
2227            # remove the allocationID info from each federant
2228            for f in rv.get('federant', []):
2229                if f.has_key('allocID'): del f['allocID']
2230        return rv
2231
2232    def get_info(self, req, fid):
2233        """
2234        Return all the stored info about this experiment
2235        """
2236        rv = None
2237
2238        req = req.get('InfoRequestBody', None)
2239        if not req:
2240            raise service_error(service_error.req,
2241                    "Bad request format (no InfoRequestBody)")
2242        exp = req.get('experiment', None)
2243        if exp:
2244            if exp.has_key('fedid'):
2245                key = exp['fedid']
2246                keytype = "fedid"
2247            elif exp.has_key('localname'):
2248                key = exp['localname']
2249                keytype = "localname"
2250            else:
2251                raise service_error(service_error.req, "Unknown lookup type")
2252        else:
2253            raise service_error(service_error.req, "No request?")
2254
2255        self.check_experiment_access(fid, key)
2256
2257        # The state may be massaged by the service function that called
2258        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2259        # state.
2260        self.state_lock.acquire()
2261        if self.state.has_key(key):
2262            rv = copy.deepcopy(self.state[key])
2263        self.state_lock.release()
2264
2265        if rv:
2266            return self.clean_info_response(rv)
2267        else:
2268            raise service_error(service_error.req, "No such experiment")
2269
2270    def get_multi_info(self, req, fid):
2271        """
2272        Return all the stored info that this fedid can access
2273        """
2274        rv = { 'info': [ ] }
2275
2276        self.state_lock.acquire()
2277        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2278            self.check_experiment_access(fid, key)
2279
2280            if self.state.has_key(key):
2281                e = copy.deepcopy(self.state[key])
2282                e = self.clean_info_response(e)
2283                rv['info'].append(e)
2284        self.state_lock.release()
2285        return rv
2286
2287
2288    def terminate_experiment(self, req, fid):
2289        """
2290        Swap this experiment out on the federants and delete the shared
2291        information
2292        """
2293        tbparams = { }
2294        req = req.get('TerminateRequestBody', None)
2295        if not req:
2296            raise service_error(service_error.req,
2297                    "Bad request format (no TerminateRequestBody)")
2298        force = req.get('force', False)
2299        exp = req.get('experiment', None)
2300        if exp:
2301            if exp.has_key('fedid'):
2302                key = exp['fedid']
2303                keytype = "fedid"
2304            elif exp.has_key('localname'):
2305                key = exp['localname']
2306                keytype = "localname"
2307            else:
2308                raise service_error(service_error.req, "Unknown lookup type")
2309        else:
2310            raise service_error(service_error.req, "No request?")
2311
2312        self.check_experiment_access(fid, key)
2313
2314        dealloc_list = [ ]
2315
2316
2317        # Create a logger that logs to the dealloc_list as well as to the main
2318        # log file.
2319        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2320        h = logging.StreamHandler(self.list_log(dealloc_list))
2321        # XXX: there should be a global one of these rather than repeating the
2322        # code.
2323        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2324                    '%d %b %y %H:%M:%S'))
2325        dealloc_log.addHandler(h)
2326
2327        self.state_lock.acquire()
2328        fed_exp = self.state.get(key, None)
2329
2330        if fed_exp:
2331            # This branch of the conditional holds the lock to generate a
2332            # consistent temporary tbparams variable to deallocate experiments.
2333            # It releases the lock to do the deallocations and reacquires it to
2334            # remove the experiment state when the termination is complete.
2335
2336            # First make sure that the experiment creation is complete.
2337            status = fed_exp.get('experimentStatus', None)
2338
2339            if status:
2340                if status in ('starting', 'terminating'):
2341                    if not force:
2342                        self.state_lock.release()
2343                        raise service_error(service_error.partial, 
2344                                'Experiment still being created or destroyed')
2345                    else:
2346                        self.log.warning('Experiment in %s state ' % status + \
2347                                'being terminated by force.')
2348            else:
2349                # No status??? trouble
2350                self.state_lock.release()
2351                raise service_error(service_error.internal,
2352                        "Experiment has no status!?")
2353
2354            ids = []
2355            #  experimentID is a list of dicts that are self-describing
2356            #  identifiers.  This finds all the fedids and localnames - the
2357            #  keys of self.state - and puts them into ids.
2358            for id in fed_exp.get('experimentID', []):
2359                if id.has_key('fedid'): ids.append(id['fedid'])
2360                if id.has_key('localname'): ids.append(id['localname'])
2361
2362            # Construct enough of the tbparams to make the stop_segment calls
2363            # work
2364            for fed in fed_exp.get('federant', []):
2365                try:
2366                    for e in fed['name']:
2367                        eid = e.get('localname', None)
2368                        if eid: break
2369                    else:
2370                        continue
2371
2372                    p = fed['emulab']['project']
2373
2374                    project = p['name']['localname']
2375                    tb = p['testbed']['localname']
2376                    user = p['user'][0]['userID']['localname']
2377
2378                    domain = fed['emulab']['domain']
2379                    host  = fed['emulab']['ops']
2380                    aid = fed['allocID']
2381                except KeyError, e:
2382                    continue
2383                tbparams[tb] = {\
2384                        'user': user,\
2385                        'domain': domain,\
2386                        'project': project,\
2387                        'host': host,\
2388                        'eid': eid,\
2389                        'aid': aid,\
2390                    }
2391                print "%s %s" % (tb, tbparams[tb])
2392            fed_exp['experimentStatus'] = 'terminating'
2393            if self.state_filename: self.write_state()
2394            self.state_lock.release()
2395
2396            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2397            # then completes, so we can't wait if nothing starts.  So, no
2398            # tbparams, no start.
2399            if len(tbparams) > 0:
2400                thread_pool = self.thread_pool(self.nthreads)
2401                for tb in tbparams.keys():
2402                    # Create and start a thread to stop the segment
2403                    thread_pool.wait_for_slot()
2404                    t  = self.pooled_thread(\
2405                            target=self.stop_segment(log=dealloc_log,
2406                                keyfile=self.ssh_privkey_file, debug=self.debug), 
2407                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2408                            pdata=thread_pool, trace_file=self.trace_file)
2409                    t.start()
2410                # Wait for completions
2411                thread_pool.wait_for_all_done()
2412
2413            # release the allocations (failed experiments have done this
2414            # already, and starting experiments may be in odd states, so we
2415            # ignore errors releasing those allocations
2416            try: 
2417                for tb in tbparams.keys():
2418                    self.release_access(tb, tbparams[tb]['aid'])
2419            except service_error, e:
2420                if status != 'failed' and not force:
2421                    raise e
2422
2423            # Remove the terminated experiment
2424            self.state_lock.acquire()
2425            for id in ids:
2426                if self.state.has_key(id): del self.state[id]
2427
2428            if self.state_filename: self.write_state()
2429            self.state_lock.release()
2430
2431            return { 
2432                    'experiment': exp , 
2433                    'deallocationLog': "".join(dealloc_list),
2434                    }
2435        else:
2436            # Don't forget to release the lock
2437            self.state_lock.release()
2438            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.