source: fedd/federation/experiment_control.py @ 9beaf7c

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 9beaf7c was f9ef40b, checked in by Ted Faber <faber@…>, 15 years ago

checkpoint: writing config files

  • Property mode set to 100644
File size: 123.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32from ip_allocator import ip_allocator
33from ip_addr import ip_addr
34
35
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.experiment_control")
40fl.addHandler(nullHandler())
41
42class experiment_control_local:
43    """
44    Control of experiments that this system can directly access.
45
46    Includes experiment creation, termination and information dissemination.
47    Thred safe.
48    """
49
50    class ssh_cmd_timeout(RuntimeError): pass
51
52    class list_log:
53        """
54        Provide an interface that lets logger.StreamHandler s write to a list
55        of strings.
56        """
57        def __init__(self, l=[]):
58            """
59            Link to an existing list or just create a log
60            """
61            self.ll = l
62            self.lock = Lock()
63        def write(self, str):
64            """
65            Add the string to the log.  Lock for consistency.
66            """
67            self.lock.acquire()
68            self.ll.append(str)
69            self.lock.release()
70
71        def flush(self):
72            """
73            No-op that StreamHandlers expect
74            """
75            pass
76
77   
78    class thread_pool:
79        """
80        A class to keep track of a set of threads all invoked for the same
81        task.  Manages the mutual exclusion of the states.
82        """
83        def __init__(self, nthreads):
84            """
85            Start a pool.
86            """
87            self.changed = Condition()
88            self.started = 0
89            self.terminated = 0
90            self.nthreads = nthreads
91
92        def acquire(self):
93            """
94            Get the pool's lock.
95            """
96            self.changed.acquire()
97
98        def release(self):
99            """
100            Release the pool's lock.
101            """
102            self.changed.release()
103
104        def wait(self, timeout = None):
105            """
106            Wait for a pool thread to start or stop.
107            """
108            self.changed.wait(timeout)
109
110        def start(self):
111            """
112            Called by a pool thread to report starting.
113            """
114            self.changed.acquire()
115            self.started += 1
116            self.changed.notifyAll()
117            self.changed.release()
118
119        def terminate(self):
120            """
121            Called by a pool thread to report finishing.
122            """
123            self.changed.acquire()
124            self.terminated += 1
125            self.changed.notifyAll()
126            self.changed.release()
127
128        def clear(self):
129            """
130            Clear all pool data.
131            """
132            self.changed.acquire()
133            self.started = 0
134            self.terminated =0
135            self.changed.notifyAll()
136            self.changed.release()
137
138        def wait_for_slot(self):
139            """
140            Wait until we have a free slot to start another pooled thread
141            """
142            self.acquire()
143            while self.started - self.terminated >= self.nthreads:
144                self.wait()
145            self.release()
146
147        def wait_for_all_done(self):
148            """
149            Wait until all active threads finish (and at least one has started)
150            """
151            self.acquire()
152            while self.started == 0 or self.started > self.terminated:
153                self.wait()
154            self.release()
155
156    class pooled_thread(Thread):
157        """
158        One of a set of threads dedicated to a specific task.  Uses the
159        thread_pool class above for coordination.
160        """
161        def __init__(self, group=None, target=None, name=None, args=(), 
162                kwargs={}, pdata=None, trace_file=None):
163            Thread.__init__(self, group, target, name, args, kwargs)
164            self.rv = None          # Return value of the ops in this thread
165            self.exception = None   # Exception that terminated this thread
166            self.target=target      # Target function to run on start()
167            self.args = args        # Args to pass to target
168            self.kwargs = kwargs    # Additional kw args
169            self.pdata = pdata      # thread_pool for this class
170            # Logger for this thread
171            self.log = logging.getLogger("fedd.experiment_control")
172       
173        def run(self):
174            """
175            Emulate Thread.run, except add pool data manipulation and error
176            logging.
177            """
178            if self.pdata:
179                self.pdata.start()
180
181            if self.target:
182                try:
183                    self.rv = self.target(*self.args, **self.kwargs)
184                except service_error, s:
185                    self.exception = s
186                    self.log.error("Thread exception: %s %s" % \
187                            (s.code_string(), s.desc))
188                except:
189                    self.exception = sys.exc_info()[1]
190                    self.log.error(("Unexpected thread exception: %s" +\
191                            "Trace %s") % (self.exception,\
192                                traceback.format_exc()))
193            if self.pdata:
194                self.pdata.terminate()
195
196    call_RequestAccess = service_caller('RequestAccess')
197    call_ReleaseAccess = service_caller('ReleaseAccess')
198    call_StartSegment = service_caller('StartSegment')
199    call_Ns2Split = service_caller('Ns2Split')
200
201    def __init__(self, config=None, auth=None):
202        """
203        Intialize the various attributes, most from the config object
204        """
205
206        def parse_tarfile_list(tf):
207            """
208            Parse a tarfile list from the configuration.  This is a set of
209            paths and tarfiles separated by spaces.
210            """
211            rv = [ ]
212            if tf is not None:
213                tl = tf.split()
214                while len(tl) > 1:
215                    p, t = tl[0:2]
216                    del tl[0:2]
217                    rv.append((p, t))
218            return rv
219
220        self.thread_with_rv = experiment_control_local.pooled_thread
221        self.thread_pool = experiment_control_local.thread_pool
222        self.list_log = experiment_control_local.list_log
223
224        self.cert_file = config.get("experiment_control", "cert_file")
225        if self.cert_file:
226            self.cert_pwd = config.get("experiment_control", "cert_pwd")
227        else:
228            self.cert_file = config.get("globals", "cert_file")
229            self.cert_pwd = config.get("globals", "cert_pwd")
230
231        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
232                or config.get("globals", "trusted_certs")
233
234        self.repodir = config.get("experiment_control", "repodir")
235
236        self.exp_stem = "fed-stem"
237        self.log = logging.getLogger("fedd.experiment_control")
238        set_log_level(config, "experiment_control", self.log)
239        self.muxmax = 2
240        self.nthreads = 2
241        self.randomize_experiments = False
242
243        self.splitter = None
244        self.ssh_keygen = "/usr/bin/ssh-keygen"
245        self.ssh_identity_file = None
246
247
248        self.debug = config.getboolean("experiment_control", "create_debug")
249        self.state_filename = config.get("experiment_control", 
250                "experiment_state")
251        self.splitter_url = config.get("experiment_control", "splitter_uri")
252        self.fedkit = parse_tarfile_list(\
253                config.get("experiment_control", "fedkit"))
254        self.gatewaykit = parse_tarfile_list(\
255                config.get("experiment_control", "gatewaykit"))
256        accessdb_file = config.get("experiment_control", "accessdb")
257
258        self.ssh_pubkey_file = config.get("experiment_control", 
259                "ssh_pubkey_file")
260        self.ssh_privkey_file = config.get("experiment_control",
261                "ssh_privkey_file")
262        # NB for internal master/slave ops, not experiment setup
263        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
264
265        self.overrides = set([])
266        ovr = config.get('experiment_control', 'overrides')
267        if ovr:
268            for o in ovr.split(","):
269                o = o.strip()
270                if o.startswith('fedid:'): o = o[len('fedid:'):]
271                self.overrides.add(fedid(hexstr=o))
272
273        self.state = { }
274        self.state_lock = Lock()
275        self.tclsh = "/usr/local/bin/otclsh"
276        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
277                config.get("experiment_control", "tcl_splitter",
278                        "/usr/testbed/lib/ns2ir/parse.tcl")
279        mapdb_file = config.get("experiment_control", "mapdb")
280        self.trace_file = sys.stderr
281
282        self.def_expstart = \
283                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
284                "/tmp/federate";
285        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
286                "FEDDIR/hosts";
287        self.def_gwstart = \
288                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
289                "/tmp/bridge.log";
290        self.def_mgwstart = \
291                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
292                "/tmp/bridge.log";
293        self.def_gwimage = "FBSD61-TUNNEL2";
294        self.def_gwtype = "pc";
295        self.local_access = { }
296
297        if auth:
298            self.auth = auth
299        else:
300            self.log.error(\
301                    "[access]: No authorizer initialized, creating local one.")
302            auth = authorizer()
303
304
305        if self.ssh_pubkey_file:
306            try:
307                f = open(self.ssh_pubkey_file, 'r')
308                self.ssh_pubkey = f.read()
309                f.close()
310            except IOError:
311                raise service_error(service_error.internal,
312                        "Cannot read sshpubkey")
313        else:
314            raise service_error(service_error.internal, 
315                    "No SSH public key file?")
316
317        if not self.ssh_privkey_file:
318            raise service_error(service_error.internal, 
319                    "No SSH public key file?")
320
321
322        if mapdb_file:
323            self.read_mapdb(mapdb_file)
324        else:
325            self.log.warn("[experiment_control] No testbed map, using defaults")
326            self.tbmap = { 
327                    'deter':'https://users.isi.deterlab.net:23235',
328                    'emulab':'https://users.isi.deterlab.net:23236',
329                    'ucb':'https://users.isi.deterlab.net:23237',
330                    }
331
332        if accessdb_file:
333                self.read_accessdb(accessdb_file)
334        else:
335            raise service_error(service_error.internal,
336                    "No accessdb specified in config")
337
338        # Grab saved state.  OK to do this w/o locking because it's read only
339        # and only one thread should be in existence that can see self.state at
340        # this point.
341        if self.state_filename:
342            self.read_state()
343
344        # Dispatch tables
345        self.soap_services = {\
346                'Create': soap_handler('Create', self.new_create_experiment),
347                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
348                'Vis': soap_handler('Vis', self.get_vis),
349                'Info': soap_handler('Info', self.get_info),
350                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
351                'Terminate': soap_handler('Terminate', 
352                    self.terminate_experiment),
353        }
354
355        self.xmlrpc_services = {\
356                'Create': xmlrpc_handler('Create', self.new_create_experiment),
357                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
358                'Vis': xmlrpc_handler('Vis', self.get_vis),
359                'Info': xmlrpc_handler('Info', self.get_info),
360                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
361                'Terminate': xmlrpc_handler('Terminate',
362                    self.terminate_experiment),
363        }
364
365    def copy_file(self, src, dest, size=1024):
366        """
367        Exceedingly simple file copy.
368        """
369        s = open(src,'r')
370        d = open(dest, 'w')
371
372        buf = "x"
373        while buf != "":
374            buf = s.read(size)
375            d.write(buf)
376        s.close()
377        d.close()
378
379    # Call while holding self.state_lock
380    def write_state(self):
381        """
382        Write a new copy of experiment state after copying the existing state
383        to a backup.
384
385        State format is a simple pickling of the state dictionary.
386        """
387        if os.access(self.state_filename, os.W_OK):
388            self.copy_file(self.state_filename, \
389                    "%s.bak" % self.state_filename)
390        try:
391            f = open(self.state_filename, 'w')
392            pickle.dump(self.state, f)
393        except IOError, e:
394            self.log.error("Can't write file %s: %s" % \
395                    (self.state_filename, e))
396        except pickle.PicklingError, e:
397            self.log.error("Pickling problem: %s" % e)
398        except TypeError, e:
399            self.log.error("Pickling problem (TypeError): %s" % e)
400
401    # Call while holding self.state_lock
402    def read_state(self):
403        """
404        Read a new copy of experiment state.  Old state is overwritten.
405
406        State format is a simple pickling of the state dictionary.
407        """
408       
409        def get_experiment_id(state):
410            """
411            Pull the fedid experimentID out of the saved state.  This is kind
412            of a gross walk through the dict.
413            """
414
415            if state.has_key('experimentID'):
416                for e in state['experimentID']:
417                    if e.has_key('fedid'):
418                        return e['fedid']
419                else:
420                    return None
421            else:
422                return None
423
424        def get_alloc_ids(state):
425            """
426            Pull the fedids of the identifiers of each allocation from the
427            state.  Again, a dict dive that's best isolated.
428            """
429
430            return [ f['allocID']['fedid'] 
431                    for f in state.get('federant',[]) \
432                        if f.has_key('allocID') and \
433                            f['allocID'].has_key('fedid')]
434
435
436        try:
437            f = open(self.state_filename, "r")
438            self.state = pickle.load(f)
439            self.log.debug("[read_state]: Read state from %s" % \
440                    self.state_filename)
441        except IOError, e:
442            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
443                    % (self.state_filename, e))
444        except pickle.UnpicklingError, e:
445            self.log.warning(("[read_state]: No saved state: " + \
446                    "Unpickling failed: %s") % e)
447       
448        for s in self.state.values():
449            try:
450
451                eid = get_experiment_id(s)
452                if eid : 
453                    # Give the owner rights to the experiment
454                    self.auth.set_attribute(s['owner'], eid)
455                    # And holders of the eid as well
456                    self.auth.set_attribute(eid, eid)
457                    # allow overrides to control experiments as well
458                    for o in self.overrides:
459                        self.auth.set_attribute(o, eid)
460                    # Set permissions to allow reading of the software repo, if
461                    # any, as well.
462                    for a in get_alloc_ids(s):
463                        self.auth.set_attribute(a, 'repo/%s' % eid)
464                else:
465                    raise KeyError("No experiment id")
466            except KeyError, e:
467                self.log.warning("[read_state]: State ownership or identity " +\
468                        "misformatted in %s: %s" % (self.state_filename, e))
469
470
471    def read_accessdb(self, accessdb_file):
472        """
473        Read the mapping from fedids that can create experiments to their name
474        in the 3-level access namespace.  All will be asserted from this
475        testbed and can include the local username and porject that will be
476        asserted on their behalf by this fedd.  Each fedid is also added to the
477        authorization system with the "create" attribute.
478        """
479        self.accessdb = {}
480        # These are the regexps for parsing the db
481        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
482        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
483                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
484        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
485                "\s*->\s*(" + name_expr + ")\s*$")
486        lineno = 0
487
488        # Parse the mappings and store in self.authdb, a dict of
489        # fedid -> (proj, user)
490        try:
491            f = open(accessdb_file, "r")
492            for line in f:
493                lineno += 1
494                line = line.strip()
495                if len(line) == 0 or line.startswith('#'):
496                    continue
497                m = project_line.match(line)
498                if m:
499                    fid = fedid(hexstr=m.group(1))
500                    project, user = m.group(2,3)
501                    if not self.accessdb.has_key(fid):
502                        self.accessdb[fid] = []
503                    self.accessdb[fid].append((project, user))
504                    continue
505
506                m = user_line.match(line)
507                if m:
508                    fid = fedid(hexstr=m.group(1))
509                    project = None
510                    user = m.group(2)
511                    if not self.accessdb.has_key(fid):
512                        self.accessdb[fid] = []
513                    self.accessdb[fid].append((project, user))
514                    continue
515                self.log.warn("[experiment_control] Error parsing access " +\
516                        "db %s at line %d" %  (accessdb_file, lineno))
517        except IOError:
518            raise service_error(service_error.internal,
519                    "Error opening/reading %s as experiment " +\
520                            "control accessdb" %  accessdb_file)
521        f.close()
522
523        # Initialize the authorization attributes
524        for fid in self.accessdb.keys():
525            self.auth.set_attribute(fid, 'create')
526
527    def read_mapdb(self, file):
528        """
529        Read a simple colon separated list of mappings for the
530        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
531        """
532
533        self.tbmap = { }
534        lineno =0
535        try:
536            f = open(file, "r")
537            for line in f:
538                lineno += 1
539                line = line.strip()
540                if line.startswith('#') or len(line) == 0:
541                    continue
542                try:
543                    label, url = line.split(':', 1)
544                    self.tbmap[label] = url
545                except ValueError, e:
546                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
547                            "map db: %s %s" % (lineno, line, e))
548        except IOError, e:
549            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
550                    "open %s: %s" % (file, e))
551        f.close()
552
553    class emulab_segment:
554        def __init__(self, log=None, keyfile=None, debug=False):
555            self.log = log or logging.getLogger(\
556                    'fedd.experiment_control.emulab_segment')
557            self.ssh_privkey_file = keyfile
558            self.debug = debug
559            self.ssh_exec="/usr/bin/ssh"
560            self.scp_exec = "/usr/bin/scp"
561            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
562
563        def scp_file(self, file, user, host, dest=""):
564            """
565            scp a file to the remote host.  If debug is set the action is only
566            logged.
567            """
568
569            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
570                    '-o', 'StrictHostKeyChecking yes', '-i', 
571                    self.ssh_privkey_file, file, 
572                    "%s@%s:%s" % (user, host, dest)]
573            rv = 0
574
575            try:
576                dnull = open("/dev/null", "w")
577            except IOError:
578                self.log.debug("[ssh_file]: failed to open " + \
579                        "/dev/null for redirect")
580                dnull = Null
581
582            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
583            if not self.debug:
584                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
585                        close_fds=True)
586
587            return rv == 0
588
589        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
590            """
591            Run a remote command on host as user.  If debug is set, the action
592            is only logged.  Commands are run without stdin, to avoid stray
593            SIGTTINs.
594            """
595            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
596                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
597                    (self.ssh_exec, self.ssh_privkey_file, 
598                            user, host, cmd)
599
600            try:
601                dnull = open("/dev/null", "w")
602            except IOError:
603                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
604                        "for redirect")
605                dnull = Null
606
607            self.log.debug("[ssh_cmd]: %s" % sh_str)
608            if not self.debug:
609                if dnull:
610                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
611                            close_fds=True)
612                else:
613                    sub = Popen(sh_str, shell=True,
614                            close_fds=True)
615                if timeout:
616                    i = 0
617                    rv = sub.poll()
618                    while i < timeout:
619                        if rv is not None: break
620                        else:
621                            time.sleep(1)
622                            rv = sub.poll()
623                            i += 1
624                    else:
625                        self.log.debug("Process exceeded runtime: %s" % sh_str)
626                        os.kill(sub.pid, signal.SIGKILL)
627                        raise self.ssh_cmd_timeout();
628                    return rv == 0
629                else:
630                    return sub.wait() == 0
631            else:
632                if timeout == 0:
633                    self.log.debug("debug timeout raised on %s " % sh_str)
634                    raise self.ssh_cmd_timeout()
635                else:
636                    return True
637
638    class start_segment(emulab_segment):
639        def __init__(self, log=None, keyfile=None, debug=False):
640            experiment_control_local.emulab_segment.__init__(self,
641                    log=log, keyfile=keyfile, debug=debug)
642
643        def create_config_tree(self, src_dir, dest_dir, script):
644            """
645            Append commands to script that will create the directory hierarchy
646            on the remote federant.
647            """
648
649            if os.path.isdir(src_dir):
650                print >>script, "mkdir -p %s" % dest_dir
651                print >>script, "chmod 770 %s" % dest_dir
652
653                for f in os.listdir(src_dir):
654                    if os.path.isdir(f):
655                        self.create_config_tree("%s/%s" % (src_dir, f), 
656                                "%s/%s" % (dest_dir, f), script)
657            else:
658                self.log.debug("[create_config_tree]: Not a directory: %s" \
659                        % src_dir)
660
661        def ship_configs(self, host, user, src_dir, dest_dir):
662            """
663            Copy federant-specific configuration files to the federant.
664            """
665            for f in os.listdir(src_dir):
666                if os.path.isdir(f):
667                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
668                            "%s/%s" % (dest_dir, f)):
669                        return False
670                else:
671                    if not self.scp_file("%s/%s" % (src_dir, f), 
672                            user, host, dest_dir):
673                        return False
674            return True
675
676        def get_state(self, user, host, tb, pid, eid):
677            # command to test experiment state
678            expinfo_exec = "/usr/testbed/bin/expinfo" 
679            # Regular expressions to parse the expinfo response
680            state_re = re.compile("State:\s+(\w+)")
681            no_exp_re = re.compile("^No\s+such\s+experiment")
682            swapping_re = re.compile("^No\s+information\s+available.")
683            state = None    # Experiment state parsed from expinfo
684            # The expinfo ssh command.  Note the identity restriction to use
685            # only the identity provided in the pubkey given.
686            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
687                    'StrictHostKeyChecking yes', '-i', 
688                    self.ssh_privkey_file, "%s@%s" % (user, host), 
689                    expinfo_exec, pid, eid]
690
691            dev_null = None
692            try:
693                dev_null = open("/dev/null", "a")
694            except IOError, e:
695                self.log.error("[get_state]: can't open /dev/null: %s" %e)
696
697            if self.debug:
698                state = 'swapped'
699                rv = 0
700            else:
701                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
702                        close_fds=True)
703                for line in status.stdout:
704                    m = state_re.match(line)
705                    if m: state = m.group(1)
706                    else:
707                        for reg, st in ((no_exp_re, "none"),
708                                (swapping_re, "swapping")):
709                            m = reg.match(line)
710                            if m: state = st
711                rv = status.wait()
712
713            # If the experiment is not present the subcommand returns a
714            # non-zero return value.  If we successfully parsed a "none"
715            # outcome, ignore the return code.
716            if rv != 0 and state != 'none':
717                raise service_error(service_error.internal,
718                        "Cannot get status of segment %s:%s/%s" % \
719                                (tb, pid, eid))
720            elif state not in ('active', 'swapped', 'swapping', 'none'):
721                raise service_error(service_error.internal,
722                        "Cannot get status of segment %s:%s/%s" % \
723                                (tb, pid, eid))
724            else: return state
725
726
727        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
728            """
729            Start a sub-experiment on a federant.
730
731            Get the current state, modify or create as appropriate, ship data
732            and configs and start the experiment.  There are small ordering
733            differences based on the initial state of the sub-experiment.
734            """
735            # ops node in the federant
736            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
737            user = tbparams[tb]['user']     # federant user
738            pid = tbparams[tb]['project']   # federant project
739            # XXX
740            base_confs = ( "hosts",)
741            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
742            # Configuration directories on the remote machine
743            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
744            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
745            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
746
747            state = self.get_state(user, host, tb, pid, eid)
748
749            self.log.debug("[start_segment]: %s: %s" % (tb, state))
750            self.log.info("[start_segment]:transferring experiment to %s" % tb)
751
752            if not self.scp_file("%s/%s/%s" % \
753                    (tmpdir, tb, tclfile), user, host):
754                return False
755           
756            if state == 'none':
757                # Create a null copy of the experiment so that we capture any
758                # logs there if the modify fails.  Emulab software discards the
759                # logs from a failed startexp
760                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
761                    return False
762                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
763                timedout = False
764                try:
765                    if not self.ssh_cmd(user, host,
766                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
767                            "-e %s null.tcl") % (pid, eid), "startexp",
768                            timeout=60 * 10):
769                        return False
770                except self.ssh_cmd_timeout:
771                    timedout = True
772
773                if timedout:
774                    state = self.get_state(user, host, tb, pid, eid)
775                    if state != "swapped":
776                        return False
777
778           
779            # Open up a temporary file to contain a script for setting up the
780            # filespace for the new experiment.
781            self.log.info("[start_segment]: creating script file")
782            try:
783                sf, scriptname = tempfile.mkstemp()
784                scriptfile = os.fdopen(sf, 'w')
785            except IOError:
786                return False
787
788            scriptbase = os.path.basename(scriptname)
789
790            # Script the filesystem changes
791            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
792            # Clear and create the tarfiles and rpm directories
793            for d in (tarfiles_dir, rpms_dir):
794                print >>scriptfile, "/bin/rm -rf %s/*" % d
795                print >>scriptfile, "mkdir -p %s" % d
796            print >>scriptfile, 'mkdir -p %s' % proj_dir
797            self.create_config_tree("%s/%s" % (tmpdir, tb),
798                    proj_dir, scriptfile)
799            if os.path.isdir("%s/tarfiles" % tmpdir):
800                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
801                        scriptfile)
802            if os.path.isdir("%s/rpms" % tmpdir):
803                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
804                        scriptfile)
805            print >>scriptfile, "rm -f %s" % scriptbase
806            scriptfile.close()
807
808            # Move the script to the remote machine
809            # XXX: could collide tempfile names on the remote host
810            if self.scp_file(scriptname, user, host, scriptbase):
811                os.remove(scriptname)
812            else:
813                return False
814
815            # Execute the script (and the script's last line deletes it)
816            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
817                return False
818
819            for f in base_confs:
820                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
821                        "%s/%s" % (proj_dir, f)):
822                    return False
823            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
824                    proj_dir):
825                return False
826            if os.path.isdir("%s/tarfiles" % tmpdir):
827                if not self.ship_configs(host, user,
828                        "%s/tarfiles" % tmpdir, tarfiles_dir):
829                    return False
830            if os.path.isdir("%s/rpms" % tmpdir):
831                if not self.ship_configs(host, user,
832                        "%s/rpms" % tmpdir, tarfiles_dir):
833                    return False
834            # Stage the new configuration (active experiments will stay swapped
835            # in now)
836            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
837            try:
838                if not self.ssh_cmd(user, host,
839                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
840                                (pid, eid, tclfile),
841                        "modexp", timeout= 60 * 10):
842                    return False
843            except self.ssh_cmd_timeout:
844                self.log.error("Modify command failed to complete in time")
845                # There's really no way to see if this succeeded or failed, so
846                # if it hangs, assume the worst.
847                return False
848            # Active experiments are still swapped, this swaps the others in.
849            if state != 'active':
850                self.log.info("[start_segment]: Swapping %s in on %s" % \
851                        (eid, tb))
852                timedout = False
853                try:
854                    if not self.ssh_cmd(user, host,
855                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
856                            "swapexp", timeout=10*60):
857                        return False
858                except self.ssh_cmd_timeout:
859                    timedout = True
860               
861                # If the command was terminated, but completed successfully,
862                # report success.
863                if timedout:
864                    self.log.debug("[start_segment]: swapin timed out " +\
865                            "checking state")
866                    state = self.get_state(user, host, tb, pid, eid)
867                    self.log.debug("[start_segment]: state is %s" % state)
868                    return state == 'active'
869            # Everything has gone OK.
870            return True
871
872    class stop_segment(emulab_segment):
873        def __init__(self, log=None, keyfile=None, debug=False):
874            experiment_control_local.emulab_segment.__init__(self,
875                    log=log, keyfile=keyfile, debug=debug)
876
877        def __call__(self, tb, eid, tbparams):
878            """
879            Stop a sub experiment by calling swapexp on the federant
880            """
881            user = tbparams[tb]['user']
882            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
883            pid = tbparams[tb]['project']
884
885            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
886            rv = False
887            try:
888                # Clean out tar files: we've gone over quota in the past
889                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
890                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
891                        (pid, eid))
892                rv = self.ssh_cmd(user, host,
893                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
894            except self.ssh_cmd_timeout:
895                rv = False
896            return rv
897
898       
899    def generate_ssh_keys(self, dest, type="rsa" ):
900        """
901        Generate a set of keys for the gateways to use to talk.
902
903        Keys are of type type and are stored in the required dest file.
904        """
905        valid_types = ("rsa", "dsa")
906        t = type.lower();
907        if t not in valid_types: raise ValueError
908        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
909
910        try:
911            trace = open("/dev/null", "w")
912        except IOError:
913            raise service_error(service_error.internal,
914                    "Cannot open /dev/null??");
915
916        # May raise CalledProcessError
917        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
918        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
919        if rv != 0:
920            raise service_error(service_error.internal, 
921                    "Cannot generate nonce ssh keys.  %s return code %d" \
922                            % (self.ssh_keygen, rv))
923
924    def gentopo(self, str):
925        """
926        Generate the topology dtat structure from the splitter's XML
927        representation of it.
928
929        The topology XML looks like:
930            <experiment>
931                <nodes>
932                    <node><vname></vname><ips>ip1:ip2</ips></node>
933                </nodes>
934                <lans>
935                    <lan>
936                        <vname></vname><vnode></vnode><ip></ip>
937                        <bandwidth></bandwidth><member>node:port</member>
938                    </lan>
939                </lans>
940        """
941        class topo_parse:
942            """
943            Parse the topology XML and create the dats structure.
944            """
945            def __init__(self):
946                # Typing of the subelements for data conversion
947                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
948                self.int_subelements = ( 'bandwidth',)
949                self.float_subelements = ( 'delay',)
950                # The final data structure
951                self.nodes = [ ]
952                self.lans =  [ ]
953                self.topo = { \
954                        'node': self.nodes,\
955                        'lan' : self.lans,\
956                    }
957                self.element = { }  # Current element being created
958                self.chars = ""     # Last text seen
959
960            def end_element(self, name):
961                # After each sub element the contents is added to the current
962                # element or to the appropriate list.
963                if name == 'node':
964                    self.nodes.append(self.element)
965                    self.element = { }
966                elif name == 'lan':
967                    self.lans.append(self.element)
968                    self.element = { }
969                elif name in self.str_subelements:
970                    self.element[name] = self.chars
971                    self.chars = ""
972                elif name in self.int_subelements:
973                    self.element[name] = int(self.chars)
974                    self.chars = ""
975                elif name in self.float_subelements:
976                    self.element[name] = float(self.chars)
977                    self.chars = ""
978
979            def found_chars(self, data):
980                self.chars += data.rstrip()
981
982
983        tp = topo_parse();
984        parser = xml.parsers.expat.ParserCreate()
985        parser.EndElementHandler = tp.end_element
986        parser.CharacterDataHandler = tp.found_chars
987
988        parser.Parse(str)
989
990        return tp.topo
991       
992
993    def genviz(self, topo):
994        """
995        Generate the visualization the virtual topology
996        """
997
998        neato = "/usr/local/bin/neato"
999        # These are used to parse neato output and to create the visualization
1000        # file.
1001        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
1002        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
1003                "%s</type></node>"
1004
1005        try:
1006            # Node names
1007            nodes = [ n['vname'] for n in topo['node'] ]
1008            topo_lans = topo['lan']
1009        except KeyError, e:
1010            raise service_error(service_error.internal, "Bad topology: %s" %e)
1011
1012        lans = { }
1013        links = { }
1014
1015        # Walk through the virtual topology, organizing the connections into
1016        # 2-node connections (links) and more-than-2-node connections (lans).
1017        # When a lan is created, it's added to the list of nodes (there's a
1018        # node in the visualization for the lan).
1019        for l in topo_lans:
1020            if links.has_key(l['vname']):
1021                if len(links[l['vname']]) < 2:
1022                    links[l['vname']].append(l['vnode'])
1023                else:
1024                    nodes.append(l['vname'])
1025                    lans[l['vname']] = links[l['vname']]
1026                    del links[l['vname']]
1027                    lans[l['vname']].append(l['vnode'])
1028            elif lans.has_key(l['vname']):
1029                lans[l['vname']].append(l['vnode'])
1030            else:
1031                links[l['vname']] = [ l['vnode'] ]
1032
1033
1034        # Open up a temporary file for dot to turn into a visualization
1035        try:
1036            df, dotname = tempfile.mkstemp()
1037            dotfile = os.fdopen(df, 'w')
1038        except IOError:
1039            raise service_error(service_error.internal,
1040                    "Failed to open file in genviz")
1041
1042        try:
1043            dnull = open('/dev/null', 'w')
1044        except IOError:
1045            service_error(service_error.internal,
1046                    "Failed to open /dev/null in genviz")
1047
1048        # Generate a dot/neato input file from the links, nodes and lans
1049        try:
1050            print >>dotfile, "graph G {"
1051            for n in nodes:
1052                print >>dotfile, '\t"%s"' % n
1053            for l in links.keys():
1054                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
1055            for l in lans.keys():
1056                for n in lans[l]:
1057                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
1058            print >>dotfile, "}"
1059            dotfile.close()
1060        except TypeError:
1061            raise service_error(service_error.internal,
1062                    "Single endpoint link in vtopo")
1063        except IOError:
1064            raise service_error(service_error.internal, "Cannot write dot file")
1065
1066        # Use dot to create a visualization
1067        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1068                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
1069                close_fds=True)
1070        dnull.close()
1071
1072        # Translate dot to vis format
1073        vis_nodes = [ ]
1074        vis = { 'node': vis_nodes }
1075        for line in dot.stdout:
1076            m = vis_re.match(line)
1077            if m:
1078                vn = m.group(1)
1079                vis_node = {'name': vn, \
1080                        'x': float(m.group(2)),\
1081                        'y' : float(m.group(3)),\
1082                    }
1083                if vn in links.keys() or vn in lans.keys():
1084                    vis_node['type'] = 'lan'
1085                else:
1086                    vis_node['type'] = 'node'
1087                vis_nodes.append(vis_node)
1088        rv = dot.wait()
1089
1090        os.remove(dotname)
1091        if rv == 0 : return vis
1092        else: return None
1093
1094    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1095            access_user):
1096        """
1097        Get access to testbed through fedd and set the parameters for that tb
1098        """
1099        uri = self.tbmap.get(tb, None)
1100        if not uri:
1101            raise service_error(serice_error.server_config, 
1102                    "Unknown testbed: %s" % tb)
1103
1104        # currently this lumps all users into one service access group
1105        service_keys = [ a for u in user \
1106                for a in u.get('access', []) \
1107                    if a.has_key('sshPubkey')]
1108
1109        if len(service_keys) == 0:
1110            raise service_error(service_error.req, 
1111                    "Must have at least one SSH pubkey for services")
1112
1113
1114        for p, u in access_user:
1115            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1116                    "to %s") %  ((p or "None"), u, uri))
1117
1118            if p:
1119                # Request with user and project specified
1120                req = {\
1121                        'destinationTestbed' : { 'uri' : uri },
1122                        'project': { 
1123                            'name': {'localname': p},
1124                            'user': [ {'userID': { 'localname': u } } ],
1125                            },
1126                        'user':  user,
1127                        'allocID' : { 'localname': 'test' },
1128                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1129                        'serviceAccess' : service_keys
1130                    }
1131            else:
1132                # Request with only user specified
1133                req = {\
1134                        'destinationTestbed' : { 'uri' : uri },
1135                        'user':  [ {'userID': { 'localname': u } } ],
1136                        'allocID' : { 'localname': 'test' },
1137                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1138                        'serviceAccess' : service_keys
1139                    }
1140
1141            if tb == master:
1142                # NB, the export_project parameter is a dict that includes
1143                # the type
1144                req['exportProject'] = export_project
1145
1146            # node resources if any
1147            if nodes != None and len(nodes) > 0:
1148                rnodes = [ ]
1149                for n in nodes:
1150                    rn = { }
1151                    image, hw, count = n.split(":")
1152                    if image: rn['image'] = [ image ]
1153                    if hw: rn['hardware'] = [ hw ]
1154                    if count and int(count) >0 : rn['count'] = int(count)
1155                    rnodes.append(rn)
1156                req['resources']= { }
1157                req['resources']['node'] = rnodes
1158
1159            try:
1160                if self.local_access.has_key(uri):
1161                    # Local access call
1162                    req = { 'RequestAccessRequestBody' : req }
1163                    r = self.local_access[uri].RequestAccess(req, 
1164                            fedid(file=self.cert_file))
1165                    r = { 'RequestAccessResponseBody' : r }
1166                else:
1167                    r = self.call_RequestAccess(uri, req, 
1168                            self.cert_file, self.cert_pwd, self.trusted_certs)
1169            except service_error, e:
1170                if e.code == service_error.access:
1171                    self.log.debug("[get_access] Access denied")
1172                    r = None
1173                    continue
1174                else:
1175                    raise e
1176
1177            if r.has_key('RequestAccessResponseBody'):
1178                # Through to here we have a valid response, not a fault.
1179                # Access denied is a fault, so something better or worse than
1180                # access denied has happened.
1181                r = r['RequestAccessResponseBody']
1182                self.log.debug("[get_access] Access granted")
1183                break
1184            else:
1185                raise service_error(service_error.protocol,
1186                        "Bad proxy response")
1187       
1188        if not r:
1189            raise service_error(service_error.access, 
1190                    "Access denied by %s (%s)" % (tb, uri))
1191
1192        e = r['emulab']
1193        p = e['project']
1194        tbparam[tb] = { 
1195                "boss": e['boss'],
1196                "host": e['ops'],
1197                "domain": e['domain'],
1198                "fs": e['fileServer'],
1199                "eventserver": e['eventServer'],
1200                "project": unpack_id(p['name']),
1201                "emulab" : e,
1202                "allocID" : r['allocID'],
1203                }
1204        # Make the testbed name be the label the user applied
1205        p['testbed'] = {'localname': tb }
1206
1207        for u in p['user']:
1208            role = u.get('role', None)
1209            if role == 'experimentCreation':
1210                tbparam[tb]['user'] = unpack_id(u['userID'])
1211                break
1212        else:
1213            raise service_error(service_error.internal, 
1214                    "No createExperimentUser from %s" %tb)
1215
1216        # Add attributes to barameter space.  We don't allow attributes to
1217        # overlay any parameters already installed.
1218        for a in e['fedAttr']:
1219            try:
1220                if a['attribute'] and isinstance(a['attribute'], basestring)\
1221                        and not tbparam[tb].has_key(a['attribute'].lower()):
1222                    tbparam[tb][a['attribute'].lower()] = a['value']
1223            except KeyError:
1224                self.log.error("Bad attribute in response: %s" % a)
1225       
1226    def release_access(self, tb, aid):
1227        """
1228        Release access to testbed through fedd
1229        """
1230
1231        uri = self.tbmap.get(tb, None)
1232        if not uri:
1233            raise service_error(serice_error.server_config, 
1234                    "Unknown testbed: %s" % tb)
1235
1236        if self.local_access.has_key(uri):
1237            resp = self.local_access[uri].ReleaseAccess(\
1238                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1239                    fedid(file=self.cert_file))
1240            resp = { 'ReleaseAccessResponseBody': resp } 
1241        else:
1242            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1243                    self.cert_file, self.cert_pwd, self.trusted_certs)
1244
1245        # better error coding
1246
1247    def remote_splitter(self, uri, desc, master):
1248
1249        req = {
1250                'description' : { 'ns2description': desc },
1251                'master': master,
1252                'include_fedkit': bool(self.fedkit),
1253                'include_gatewaykit': bool(self.gatewaykit)
1254            }
1255
1256        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1257                self.trusted_certs)
1258
1259        if r.has_key('Ns2SplitResponseBody'):
1260            r = r['Ns2SplitResponseBody']
1261            if r.has_key('output'):
1262                return r['output'].splitlines()
1263            else:
1264                raise service_error(service_error.protocol, 
1265                        "Bad splitter response (no output)")
1266        else:
1267            raise service_error(service_error.protocol, "Bad splitter response")
1268       
1269    class current_testbed:
1270        """
1271        Object for collecting the current testbed description.  The testbed
1272        description is saved to a file with the local testbed variables
1273        subsittuted line by line.
1274        """
1275        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1276            def tar_list_to_string(tl):
1277                if tl is None: return None
1278
1279                rv = ""
1280                for t in tl:
1281                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1282                            (t[0], os.path.basename(t[1]))
1283                return rv
1284
1285
1286            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1287            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1288            self.current_testbed = None
1289            self.testbed_file = None
1290
1291            self.def_expstart = \
1292                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1293            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1294            self.def_gwstart = \
1295                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1296            self.def_mgwstart = \
1297                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1298            self.def_gwimage = "FBSD61-TUNNEL2";
1299            self.def_gwtype = "pc";
1300            self.def_mgwcmd = '# '
1301            self.def_mgwcmdparams = ''
1302            self.def_gwcmd = '# '
1303            self.def_gwcmdparams = ''
1304
1305            self.eid = eid
1306            self.tmpdir = tmpdir
1307            # Convert fedkit and gateway kit (which are lists of tuples) into a
1308            # substituition string.
1309            self.fedkit = tar_list_to_string(fedkit)
1310            self.gatewaykit = tar_list_to_string(gatewaykit)
1311
1312        def __call__(self, line, master, allocated, tbparams):
1313            # Capture testbed topology descriptions
1314            if self.current_testbed == None:
1315                m = self.begin_testbed.match(line)
1316                if m != None:
1317                    self.current_testbed = m.group(1)
1318                    if self.current_testbed == None:
1319                        raise service_error(service_error.req,
1320                                "Bad request format (unnamed testbed)")
1321                    allocated[self.current_testbed] = \
1322                            allocated.get(self.current_testbed,0) + 1
1323                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1324                    if not os.path.exists(tb_dir):
1325                        try:
1326                            os.mkdir(tb_dir)
1327                        except IOError:
1328                            raise service_error(service_error.internal,
1329                                    "Cannot create %s" % tb_dir)
1330                    try:
1331                        self.testbed_file = open("%s/%s.%s.tcl" %
1332                                (tb_dir, self.eid, self.current_testbed), 'w')
1333                    except IOError:
1334                        self.testbed_file = None
1335                    return True
1336                else: return False
1337            else:
1338                m = self.end_testbed.match(line)
1339                if m != None:
1340                    if m.group(1) != self.current_testbed:
1341                        raise service_error(service_error.internal, 
1342                                "Mismatched testbed markers!?")
1343                    if self.testbed_file != None: 
1344                        self.testbed_file.close()
1345                        self.testbed_file = None
1346                    self.current_testbed = None
1347                elif self.testbed_file:
1348                    # Substitute variables and put the line into the local
1349                    # testbed file.
1350                    gwtype = tbparams[self.current_testbed].get(\
1351                            'connectortype', self.def_gwtype)
1352                    gwimage = tbparams[self.current_testbed].get(\
1353                            'connectorimage', self.def_gwimage)
1354                    mgwstart = tbparams[self.current_testbed].get(\
1355                            'masterconnectorstartcmd', self.def_mgwstart)
1356                    mexpstart = tbparams[self.current_testbed].get(\
1357                            'masternodestartcmd', self.def_mexpstart)
1358                    gwstart = tbparams[self.current_testbed].get(\
1359                            'slaveconnectorstartcmd', self.def_gwstart)
1360                    expstart = tbparams[self.current_testbed].get(\
1361                            'slavenodestartcmd', self.def_expstart)
1362                    project = tbparams[self.current_testbed].get('project')
1363                    gwcmd = tbparams[self.current_testbed].get(\
1364                            'slaveconnectorcmd', self.def_gwcmd)
1365                    gwcmdparams = tbparams[self.current_testbed].get(\
1366                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1367                    mgwcmd = tbparams[self.current_testbed].get(\
1368                            'masterconnectorcmd', self.def_gwcmd)
1369                    mgwcmdparams = tbparams[self.current_testbed].get(\
1370                            'masterconnectorcmdparams', self.def_gwcmdparams)
1371                    line = re.sub("GWTYPE", gwtype, line)
1372                    line = re.sub("GWIMAGE", gwimage, line)
1373                    if self.current_testbed == master:
1374                        line = re.sub("GWSTART", mgwstart, line)
1375                        line = re.sub("EXPSTART", mexpstart, line)
1376                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1377                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1378                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1379                    else:
1380                        line = re.sub("GWSTART", gwstart, line)
1381                        line = re.sub("EXPSTART", expstart, line)
1382                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1383                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1384                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1385                    #These expansions contain EID and PROJDIR.  NB these are
1386                    # local fedkit and gatewaykit, which are strings.
1387                    if self.fedkit:
1388                        line = re.sub("FEDKIT", self.fedkit, line)
1389                    if self.gatewaykit:
1390                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1391                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1392                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1393                    line = re.sub("EID", self.eid, line)
1394                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1395                            (project, self.eid), line)
1396                    print >>self.testbed_file, line
1397                return True
1398
1399    class allbeds:
1400        """
1401        Process the Allbeds section.  Get access to each federant and save the
1402        parameters in tbparams
1403        """
1404        def __init__(self, get_access):
1405            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1406            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1407            self.in_allbeds = False
1408            self.get_access = get_access
1409
1410        def __call__(self, line, user, tbparams, master, export_project,
1411                access_user):
1412            # Testbed access parameters
1413            if not self.in_allbeds:
1414                if self.begin_allbeds.match(line):
1415                    self.in_allbeds = True
1416                    return True
1417                else:
1418                    return False
1419            else:
1420                if self.end_allbeds.match(line):
1421                    self.in_allbeds = False
1422                else:
1423                    nodes = line.split('|')
1424                    tb = nodes.pop(0)
1425                    self.get_access(tb, nodes, user, tbparams, master,
1426                            export_project, access_user)
1427                return True
1428
1429    class gateways:
1430        def __init__(self, eid, master, tmpdir, gw_pubkey,
1431                gw_secretkey, copy_file, fedkit):
1432            self.begin_gateways = \
1433                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1434            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1435            self.current_gateways = None
1436            self.control_gateway = None
1437            self.active_end = { }
1438
1439            self.eid = eid
1440            self.master = master
1441            self.tmpdir = tmpdir
1442            self.gw_pubkey_base = gw_pubkey
1443            self.gw_secretkey_base = gw_secretkey
1444
1445            self.copy_file = copy_file
1446            self.fedkit = fedkit
1447
1448
1449        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1450                active_end, tbparams, dtb, myname, desthost, type):
1451            """
1452            Produce a gateway configuration file from a gateways line.
1453            """
1454
1455            sproject = tbparams[gw].get('project', 'project')
1456            dproject = tbparams[dtb].get('project', 'project')
1457            sdomain = ".%s.%s%s" % (eid, sproject,
1458                    tbparams[gw].get('domain', ".example.com"))
1459            ddomain = ".%s.%s%s" % (eid, dproject,
1460                    tbparams[dtb].get('domain', ".example.com"))
1461            boss = tbparams[master].get('boss', "boss")
1462            fs = tbparams[master].get('fs', "fs")
1463            event_server = "%s%s" % \
1464                    (tbparams[gw].get('eventserver', "event_server"),
1465                            tbparams[gw].get('domain', "example.com"))
1466            remote_event_server = "%s%s" % \
1467                    (tbparams[dtb].get('eventserver', "event_server"),
1468                            tbparams[dtb].get('domain', "example.com"))
1469            seer_control = "%s%s" % \
1470                    (tbparams[gw].get('control', "control"), sdomain)
1471            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1472
1473            if self.fedkit:
1474                remote_script_dir = "/usr/local/federation/bin"
1475                local_script_dir = "/usr/local/federation/bin"
1476            else:
1477                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1478                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1479
1480            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1481            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1482            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1483
1484            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1485            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1486
1487            # translate to lower case so the `hostname` hack for specifying
1488            # configuration files works.
1489            conf_file = conf_file.lower();
1490            remote_conf_file = remote_conf_file.lower();
1491
1492            if dtb == master:
1493                active = "false"
1494            elif gw == master:
1495                active = "true"
1496            elif active_end.has_key('%s-%s' % (dtb, gw)):
1497                active = "false"
1498            else:
1499                active_end['%s-%s' % (gw, dtb)] = 1
1500                active = "true"
1501
1502            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1503            print >>gwconfig, "Active: %s" % active
1504            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1505            if tunnel_iface:
1506                print >>gwconfig, "Interface: %s" % tunnel_iface
1507            print >>gwconfig, "BossName: %s" % boss
1508            print >>gwconfig, "FsName: %s" % fs
1509            print >>gwconfig, "EventServerName: %s" % event_server
1510            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1511            print >>gwconfig, "SeerControl: %s" % seer_control
1512            print >>gwconfig, "Type: %s" % type
1513            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1514            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1515                    local_script_dir
1516            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1517            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1518            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1519                    (remote_conf_dir, remote_conf_file)
1520            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1521            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1522            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1523            gwconfig.close()
1524
1525            return active == "true"
1526
1527        def __call__(self, line, allocated, tbparams):
1528            # Process gateways
1529            if not self.current_gateways:
1530                m = self.begin_gateways.match(line)
1531                if m:
1532                    self.current_gateways = m.group(1)
1533                    if allocated.has_key(self.current_gateways):
1534                        # This test should always succeed
1535                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1536                        if not os.path.exists(tb_dir):
1537                            try:
1538                                os.mkdir(tb_dir)
1539                            except IOError:
1540                                raise service_error(service_error.internal,
1541                                        "Cannot create %s" % tb_dir)
1542                    else:
1543                        # XXX
1544                        self.log.error("[gateways]: Ignoring gateways for " + \
1545                                "unknown testbed %s" % self.current_gateways)
1546                        self.current_gateways = None
1547                    return True
1548                else:
1549                    return False
1550            else:
1551                m = self.end_gateways.match(line)
1552                if m :
1553                    if m.group(1) != self.current_gateways:
1554                        raise service_error(service_error.internal,
1555                                "Mismatched gateway markers!?")
1556                    if self.control_gateway:
1557                        try:
1558                            cc = open("%s/%s/client.conf" %
1559                                    (self.tmpdir, self.current_gateways), 'w')
1560                            print >>cc, "ControlGateway: %s" % \
1561                                    self.control_gateway
1562                            if tbparams[self.master].has_key('smbshare'):
1563                                print >>cc, "SMBSHare: %s" % \
1564                                        tbparams[self.master]['smbshare']
1565                            print >>cc, "ProjectUser: %s" % \
1566                                    tbparams[self.master]['user']
1567                            print >>cc, "ProjectName: %s" % \
1568                                    tbparams[self.master]['project']
1569                            print >>cc, "ExperimentID: %s/%s" % \
1570                                    ( tbparams[self.master]['project'], \
1571                                    self.eid )
1572                            cc.close()
1573                        except IOError:
1574                            raise service_error(service_error.internal,
1575                                    "Error creating client config")
1576                        # XXX: This seer specific file should disappear
1577                        try:
1578                            cc = open("%s/%s/seer.conf" %
1579                                    (self.tmpdir, self.current_gateways),
1580                                    'w')
1581                            if self.current_gateways != self.master:
1582                                print >>cc, "ControlNode: %s" % \
1583                                        self.control_gateway
1584                            print >>cc, "ExperimentID: %s/%s" % \
1585                                    ( tbparams[self.master]['project'], \
1586                                    self.eid )
1587                            cc.close()
1588                        except IOError:
1589                            raise service_error(service_error.internal,
1590                                    "Error creating seer config")
1591                    else:
1592                        debug.error("[gateways]: No control gateway for %s" %\
1593                                    self.current_gateways)
1594                    self.current_gateways = None
1595                else:
1596                    dtb, myname, desthost, type = line.split(" ")
1597
1598                    if type == "control" or type == "both":
1599                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1600                                self.eid, 
1601                                tbparams[self.current_gateways]['project'],
1602                                tbparams[self.current_gateways]['domain'])
1603                    try:
1604                        active = self.gateway_conf_file(self.current_gateways,
1605                                self.master, self.eid, self.gw_pubkey_base,
1606                                self.gw_secretkey_base,
1607                                self.active_end, tbparams, dtb, myname,
1608                                desthost, type)
1609                    except IOError, e:
1610                        raise service_error(service_error.internal,
1611                                "Failed to write config file for %s" % \
1612                                        self.current_gateway)
1613           
1614                    gw_pubkey = "%s/keys/%s" % \
1615                            (self.tmpdir, self.gw_pubkey_base)
1616                    gw_secretkey = "%s/keys/%s" % \
1617                            (self.tmpdir, self.gw_secretkey_base)
1618
1619                    pkfile = "%s/%s/%s" % \
1620                            ( self.tmpdir, self.current_gateways, 
1621                                    self.gw_pubkey_base)
1622                    skfile = "%s/%s/%s" % \
1623                            ( self.tmpdir, self.current_gateways, 
1624                                    self.gw_secretkey_base)
1625
1626                    if not os.path.exists(pkfile):
1627                        try:
1628                            self.copy_file(gw_pubkey, pkfile)
1629                        except IOError:
1630                            service_error(service_error.internal,
1631                                    "Failed to copy pubkey file")
1632
1633                    if active and not os.path.exists(skfile):
1634                        try:
1635                            self.copy_file(gw_secretkey, skfile)
1636                        except IOError:
1637                            service_error(service_error.internal,
1638                                    "Failed to copy secretkey file")
1639                return True
1640
1641    class shunt_to_file:
1642        """
1643        Simple class to write data between two regexps to a file.
1644        """
1645        def __init__(self, begin, end, filename):
1646            """
1647            Begin shunting on a match of begin, stop on end, send data to
1648            filename.
1649            """
1650            self.begin = re.compile(begin)
1651            self.end = re.compile(end)
1652            self.in_shunt = False
1653            self.file = None
1654            self.filename = filename
1655
1656        def __call__(self, line):
1657            """
1658            Call this on each line in the input that may be shunted.
1659            """
1660            if not self.in_shunt:
1661                if self.begin.match(line):
1662                    self.in_shunt = True
1663                    try:
1664                        self.file = open(self.filename, "w")
1665                    except:
1666                        self.file = None
1667                        raise
1668                    return True
1669                else:
1670                    return False
1671            else:
1672                if self.end.match(line):
1673                    if self.file: 
1674                        self.file.close()
1675                        self.file = None
1676                    self.in_shunt = False
1677                else:
1678                    if self.file:
1679                        print >>self.file, line
1680                return True
1681
1682    class shunt_to_list:
1683        """
1684        Same interface as shunt_to_file.  Data collected in self.list, one list
1685        element per line.
1686        """
1687        def __init__(self, begin, end):
1688            self.begin = re.compile(begin)
1689            self.end = re.compile(end)
1690            self.in_shunt = False
1691            self.list = [ ]
1692       
1693        def __call__(self, line):
1694            if not self.in_shunt:
1695                if self.begin.match(line):
1696                    self.in_shunt = True
1697                    return True
1698                else:
1699                    return False
1700            else:
1701                if self.end.match(line):
1702                    self.in_shunt = False
1703                else:
1704                    self.list.append(line)
1705                return True
1706
1707    class shunt_to_string:
1708        """
1709        Same interface as shunt_to_file.  Data collected in self.str, all in
1710        one string.
1711        """
1712        def __init__(self, begin, end):
1713            self.begin = re.compile(begin)
1714            self.end = re.compile(end)
1715            self.in_shunt = False
1716            self.str = ""
1717       
1718        def __call__(self, line):
1719            if not self.in_shunt:
1720                if self.begin.match(line):
1721                    self.in_shunt = True
1722                    return True
1723                else:
1724                    return False
1725            else:
1726                if self.end.match(line):
1727                    self.in_shunt = False
1728                else:
1729                    self.str += line
1730                return True
1731
1732    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1733            tbparams, tmpdir, alloc_log=None):
1734        started = { }           # Testbeds where a sub-experiment started
1735                                # successfully
1736
1737        # XXX
1738        fail_soft = False
1739
1740        log = alloc_log or self.log
1741
1742        thread_pool = self.thread_pool(self.nthreads)
1743        threads = [ ]
1744
1745        for tb in [ k for k in allocated.keys() if k != master]:
1746            # Create and start a thread to start the segment, and save it to
1747            # get the return value later
1748            thread_pool.wait_for_slot()
1749            t  = self.pooled_thread(\
1750                    target=self.start_segment(log=log,
1751                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1752                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1753                    pdata=thread_pool, trace_file=self.trace_file)
1754            threads.append(t)
1755            t.start()
1756
1757        # Wait until all finish
1758        thread_pool.wait_for_all_done()
1759
1760        # If none failed, start the master
1761        failed = [ t.getName() for t in threads if not t.rv ]
1762
1763        if len(failed) == 0:
1764            starter = self.start_segment(log=log, 
1765                    keyfile=self.ssh_privkey_file, debug=self.debug)
1766            if not starter(master, eid, tbparams, tmpdir):
1767                failed.append(master)
1768
1769        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1770        # If one failed clean up, unless fail_soft is set
1771        if failed:
1772            if not fail_soft:
1773                thread_pool.clear()
1774                for tb in succeeded:
1775                    # Create and start a thread to stop the segment
1776                    thread_pool.wait_for_slot()
1777                    t  = self.pooled_thread(\
1778                            target=self.stop_segment(log=log,
1779                                keyfile=self.ssh_privkey_file,
1780                                debug=self.debug), 
1781                            args=(tb, eid, tbparams), name=tb,
1782                            pdata=thread_pool, trace_file=self.trace_file)
1783                    t.start()
1784                # Wait until all finish
1785                thread_pool.wait_for_all_done()
1786
1787                # release the allocations
1788                for tb in tbparams.keys():
1789                    self.release_access(tb, tbparams[tb]['allocID'])
1790                # Remove the placeholder
1791                self.state_lock.acquire()
1792                self.state[eid]['experimentStatus'] = 'failed'
1793                if self.state_filename: self.write_state()
1794                self.state_lock.release()
1795
1796                #raise service_error(service_error.federant,
1797                #    "Swap in failed on %s" % ",".join(failed))
1798                log.error("Swap in failed on %s" % ",".join(failed))
1799                return
1800        else:
1801            log.info("[start_segment]: Experiment %s active" % eid)
1802
1803        log.debug("[start_experiment]: removing %s" % tmpdir)
1804
1805        # Walk up tmpdir, deleting as we go
1806        for path, dirs, files in os.walk(tmpdir, topdown=False):
1807            for f in files:
1808                os.remove(os.path.join(path, f))
1809            for d in dirs:
1810                os.rmdir(os.path.join(path, d))
1811        os.rmdir(tmpdir)
1812
1813        # Insert the experiment into our state and update the disk copy
1814        self.state_lock.acquire()
1815        self.state[expid]['experimentStatus'] = 'active'
1816        self.state[eid] = self.state[expid]
1817        if self.state_filename: self.write_state()
1818        self.state_lock.release()
1819        return
1820
1821    def create_experiment(self, req, fid):
1822        """
1823        The external interface to experiment creation called from the
1824        dispatcher.
1825
1826        Creates a working directory, splits the incoming description using the
1827        splitter script and parses out the avrious subsections using the
1828        lcasses above.  Once each sub-experiment is created, use pooled threads
1829        to instantiate them and start it all up.
1830        """
1831
1832        if not self.auth.check_attribute(fid, 'create'):
1833            raise service_error(service_error.access, "Create access denied")
1834
1835        try:
1836            tmpdir = tempfile.mkdtemp(prefix="split-")
1837        except IOError:
1838            raise service_error(service_error.internal, "Cannot create tmp dir")
1839
1840        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1841        gw_secretkey_base = "fed.%s" % self.ssh_type
1842        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1843        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1844        tclfile = tmpdir + "/experiment.tcl"
1845        tbparams = { }
1846        try:
1847            access_user = self.accessdb[fid]
1848        except KeyError:
1849            raise service_error(service_error.internal,
1850                    "Access map and authorizer out of sync in " + \
1851                            "create_experiment for fedid %s"  % fid)
1852
1853        pid = "dummy"
1854        gid = "dummy"
1855        try:
1856            os.mkdir(tmpdir+"/keys")
1857        except OSError:
1858            raise service_error(service_error.internal,
1859                    "Can't make temporary dir")
1860
1861        req = req.get('CreateRequestBody', None)
1862        if not req:
1863            raise service_error(service_error.req,
1864                    "Bad request format (no CreateRequestBody)")
1865        # The tcl parser needs to read a file so put the content into that file
1866        descr=req.get('experimentdescription', None)
1867        if descr:
1868            file_content=descr.get('ns2description', None)
1869            if file_content:
1870                try:
1871                    f = open(tclfile, 'w')
1872                    f.write(file_content)
1873                    f.close()
1874                except IOError:
1875                    raise service_error(service_error.internal,
1876                            "Cannot write temp experiment description")
1877            else:
1878                raise service_error(service_error.req, 
1879                        "Only ns2descriptions supported")
1880        else:
1881            raise service_error(service_error.req, "No experiment description")
1882
1883        # Generate an ID for the experiment (slice) and a certificate that the
1884        # allocator can use to prove they own it.  We'll ship it back through
1885        # the encrypted connection.
1886        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1887
1888        if req.has_key('experimentID') and \
1889                req['experimentID'].has_key('localname'):
1890            overwrite = False
1891            eid = req['experimentID']['localname']
1892            # If there's an old failed experiment here with the same local name
1893            # and accessible by this user, we'll overwrite it, otherwise we'll
1894            # fall through and do the collision avoidance.
1895            old_expid = self.get_experiment_fedid(eid)
1896            if old_expid and self.check_experiment_access(fid, old_expid):
1897                self.state_lock.acquire()
1898                status = self.state[eid].get('experimentStatus', None)
1899                if status and status == 'failed':
1900                    # remove the old access attribute
1901                    self.auth.unset_attribute(fid, old_expid)
1902                    overwrite = True
1903                    del self.state[eid]
1904                    del self.state[old_expid]
1905                self.state_lock.release()
1906            self.state_lock.acquire()
1907            while (self.state.has_key(eid) and not overwrite):
1908                eid += random.choice(string.ascii_letters)
1909            # Initial state
1910            self.state[eid] = {
1911                    'experimentID' : \
1912                            [ { 'localname' : eid }, {'fedid': expid } ],
1913                    'experimentStatus': 'starting',
1914                    'experimentAccess': { 'X509' : expcert },
1915                    'owner': fid,
1916                    'log' : [],
1917                }
1918            self.state[expid] = self.state[eid]
1919            if self.state_filename: self.write_state()
1920            self.state_lock.release()
1921        else:
1922            eid = self.exp_stem
1923            for i in range(0,5):
1924                eid += random.choice(string.ascii_letters)
1925            self.state_lock.acquire()
1926            while (self.state.has_key(eid)):
1927                eid = self.exp_stem
1928                for i in range(0,5):
1929                    eid += random.choice(string.ascii_letters)
1930            # Initial state
1931            self.state[eid] = {
1932                    'experimentID' : \
1933                            [ { 'localname' : eid }, {'fedid': expid } ],
1934                    'experimentStatus': 'starting',
1935                    'experimentAccess': { 'X509' : expcert },
1936                    'owner': fid,
1937                    'log' : [],
1938                }
1939            self.state[expid] = self.state[eid]
1940            if self.state_filename: self.write_state()
1941            self.state_lock.release()
1942
1943        try: 
1944            # This catches exceptions to clear the placeholder if necessary
1945            try:
1946                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1947            except ValueError:
1948                raise service_error(service_error.server_config, 
1949                        "Bad key type (%s)" % self.ssh_type)
1950
1951            user = req.get('user', None)
1952            if user == None:
1953                raise service_error(service_error.req, "No user")
1954
1955            master = req.get('master', None)
1956            if not master:
1957                raise service_error(service_error.req,
1958                        "No master testbed label")
1959            export_project = req.get('exportProject', None)
1960            if not export_project:
1961                raise service_error(service_error.req, "No export project")
1962           
1963            if self.splitter_url:
1964                self.log.debug("Calling remote splitter at %s" % \
1965                        self.splitter_url)
1966                split_data = self.remote_splitter(self.splitter_url,
1967                        file_content, master)
1968            else:
1969                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1970                    str(self.muxmax), '-m', master]
1971
1972                if self.fedkit:
1973                    tclcmd.append('-k')
1974
1975                if self.gatewaykit:
1976                    tclcmd.append('-K')
1977
1978                tclcmd.extend([pid, gid, eid, tclfile])
1979
1980                self.log.debug("running local splitter %s", " ".join(tclcmd))
1981                # This is just fantastic.  As a side effect the parser copies
1982                # tb_compat.tcl into the current directory, so that directory
1983                # must be writable by the fedd user.  Doing this in the
1984                # temporary subdir ensures this is the case.
1985                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1986                        cwd=tmpdir)
1987                split_data = tclparser.stdout
1988
1989            allocated = { }         # Testbeds we can access
1990            # Objects to parse the splitter output (defined above)
1991            parse_current_testbed = self.current_testbed(eid, tmpdir,
1992                    self.fedkit, self.gatewaykit)
1993            parse_allbeds = self.allbeds(self.get_access)
1994            parse_gateways = self.gateways(eid, master, tmpdir,
1995                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1996                    self.fedkit)
1997            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1998                        "^#\s+End\s+Vtopo")
1999            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
2000                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
2001            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
2002                    "^#\s+End\s+tarfiles")
2003            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
2004                    "^#\s+End\s+rpms")
2005
2006            # Working on the split data
2007            for line in split_data:
2008                line = line.rstrip()
2009                if parse_current_testbed(line, master, allocated, tbparams):
2010                    continue
2011                elif parse_allbeds(line, user, tbparams, master, export_project,
2012                        access_user):
2013                    continue
2014                elif parse_gateways(line, allocated, tbparams):
2015                    continue
2016                elif parse_vtopo(line):
2017                    continue
2018                elif parse_hostnames(line):
2019                    continue
2020                elif parse_tarfiles(line):
2021                    continue
2022                elif parse_rpms(line):
2023                    continue
2024                else:
2025                    raise service_error(service_error.internal, 
2026                            "Bad tcl parse? %s" % line)
2027            # Virtual topology and visualization
2028            vtopo = self.gentopo(parse_vtopo.str)
2029            if not vtopo:
2030                raise service_error(service_error.internal, 
2031                        "Failed to generate virtual topology")
2032
2033            vis = self.genviz(vtopo)
2034            if not vis:
2035                raise service_error(service_error.internal, 
2036                        "Failed to generate visualization")
2037
2038           
2039            # save federant information
2040            for k in allocated.keys():
2041                tbparams[k]['federant'] = {\
2042                        'name': [ { 'localname' : eid} ],\
2043                        'emulab': tbparams[k]['emulab'],\
2044                        'allocID' : tbparams[k]['allocID'],\
2045                        'master' : k == master,\
2046                    }
2047
2048            self.state_lock.acquire()
2049            self.state[eid]['vtopo'] = vtopo
2050            self.state[eid]['vis'] = vis
2051            self.state[expid]['federant'] = \
2052                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2053                        if tbparams[tb].has_key('federant') ]
2054            if self.state_filename: self.write_state()
2055            self.state_lock.release()
2056
2057            # Copy tarfiles and rpms needed at remote sites into a staging area
2058            try:
2059                if self.fedkit:
2060                    for t in self.fedkit:
2061                        parse_tarfiles.list.append(t[1])
2062                if self.gatewaykit:
2063                    for t in self.gatewaykit:
2064                        parse_tarfiles.list.append(t[1])
2065                for t in parse_tarfiles.list:
2066                    if not os.path.exists("%s/tarfiles" % tmpdir):
2067                        os.mkdir("%s/tarfiles" % tmpdir)
2068                    self.copy_file(t, "%s/tarfiles/%s" % \
2069                            (tmpdir, os.path.basename(t)))
2070                for r in parse_rpms.list:
2071                    if not os.path.exists("%s/rpms" % tmpdir):
2072                        os.mkdir("%s/rpms" % tmpdir)
2073                    self.copy_file(r, "%s/rpms/%s" % \
2074                            (tmpdir, os.path.basename(r)))
2075                # A null experiment file in case we need to create a remote
2076                # experiment from scratch
2077                f = open("%s/null.tcl" % tmpdir, "w")
2078                print >>f, """
2079set ns [new Simulator]
2080source tb_compat.tcl
2081
2082set a [$ns node]
2083
2084$ns rtproto Session
2085$ns run
2086"""
2087                f.close()
2088
2089            except IOError, e:
2090                raise service_error(service_error.internal, 
2091                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2092
2093        except service_error, e:
2094            # If something goes wrong in the parse (usually an access error)
2095            # clear the placeholder state.  From here on out the code delays
2096            # exceptions.  Failing at this point returns a fault to the remote
2097            # caller.
2098            self.state_lock.acquire()
2099            del self.state[eid]
2100            del self.state[expid]
2101            if self.state_filename: self.write_state()
2102            self.state_lock.release()
2103            raise e
2104
2105
2106        # Start the background swapper and return the starting state.  From
2107        # here on out, the state will stick around a while.
2108
2109        # Let users touch the state
2110        self.auth.set_attribute(fid, expid)
2111        self.auth.set_attribute(expid, expid)
2112        # Override fedids can manipulate state as well
2113        for o in self.overrides:
2114            self.auth.set_attribute(o, expid)
2115
2116        # Create a logger that logs to the experiment's state object as well as
2117        # to the main log file.
2118        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2119        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2120        # XXX: there should be a global one of these rather than repeating the
2121        # code.
2122        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2123                    '%d %b %y %H:%M:%S'))
2124        alloc_log.addHandler(h)
2125       
2126        # Start a thread to do the resource allocation
2127        t  = Thread(target=self.allocate_resources,
2128                args=(allocated, master, eid, expid, expcert, tbparams, 
2129                    tmpdir, alloc_log),
2130                name=eid)
2131        t.start()
2132
2133        rv = {
2134                'experimentID': [
2135                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2136                ],
2137                'experimentStatus': 'starting',
2138                'experimentAccess': { 'X509' : expcert }
2139            }
2140
2141        return rv
2142
2143    class new_start_segment:
2144        def __init__(self, debug=False, log=None, cert_file=None,
2145                cert_pwd=None, trusted_certs=None, caller=None):
2146            self.log = log
2147            self.debug = debug
2148            self.cert_file = cert_file
2149            self.cert_pwd = cert_pwd
2150            self.trusted_certs = None
2151            self.caller = caller
2152
2153        def __call__(self, uri, aid, topo, attrs=None):
2154            req = {
2155                    'allocID': { 'fedid' : aid }, 
2156                    'segmentdescription': { 
2157                        'topdldescription': topo.to_dict(),
2158                    },
2159                }
2160            if attrs:
2161                req['fedAttr'] = attrs
2162
2163            print req
2164            r = self.caller(uri, req, self.cert_file, self.cert_pwd,
2165                    self.trusted_certs)
2166            print r
2167            return True
2168
2169
2170   
2171
2172    def new_allocate_resources(self, allocated, master, eid, expid, expcert, 
2173            tbparams, topo, tmpdir, alloc_log=None, attrs=None):
2174        started = { }           # Testbeds where a sub-experiment started
2175                                # successfully
2176
2177        # XXX
2178        fail_soft = False
2179
2180        log = alloc_log or self.log
2181
2182        thread_pool = self.thread_pool(self.nthreads)
2183        threads = [ ]
2184
2185        for tb in [ k for k in allocated.keys() if k != master]:
2186            # Create and start a thread to start the segment, and save it to
2187            # get the return value later
2188            thread_pool.wait_for_slot()
2189            uri = self.tbmap.get(tb, None)
2190            if not uri:
2191                raise service_error(service_error.internal, 
2192                        "Unknown testbed %s !?" % tb)
2193
2194            if tbparams[tb].has_key('allocID') and \
2195                    tbparams[tb]['allocID'].has_key('fedid'):
2196                aid = tbparams[tb]['allocID']['fedid']
2197            else:
2198                raise service_error(service_error.internal, 
2199                        "No alloc id for testbed %s !?" % tb)
2200
2201            t  = self.pooled_thread(\
2202                    target=self.new_start_segment(log=log, debug=self.debug,
2203                        cert_file=self.cert_file, cert_pwd=self.cert_pwd,
2204                        trusted_certs=self.trusted_certs,
2205                        caller=self.call_StartSegment), 
2206                    args=(uri, aid, topo[tb], attrs), name=tb,
2207                    pdata=thread_pool, trace_file=self.trace_file)
2208            threads.append(t)
2209            t.start()
2210
2211        # Wait until all finish
2212        thread_pool.wait_for_all_done()
2213
2214        # If none failed, start the master
2215        failed = [ t.getName() for t in threads if not t.rv ]
2216
2217        if len(failed) == 0:
2218            uri = self.tbmap.get(master, None)
2219            if not uri:
2220                raise service_error(service_error.internal, 
2221                        "Unknown testbed %s !?" % master)
2222
2223            if tbparams[master].has_key('allocID') and \
2224                    tbparams[master]['allocID'].has_key('fedid'):
2225                aid = tbparams[master]['allocID']['fedid']
2226            else:
2227                raise service_error(service_error.internal, 
2228                    "No alloc id for testbed %s !?" % master)
2229            starter = self.new_start_segment(log=log, debug=self.debug,
2230                    cert_file=self.cert_file, cert_pwd=self.cert_pwd,
2231                    trusted_certs=self.trusted_certs,
2232                    caller=self.call_StartSegment)
2233            if not starter(uri, aid, topo[master], attrs):
2234                failed.append(master)
2235
2236        succeeded = [tb for tb in allocated.keys() if tb not in failed]
2237        # If one failed clean up, unless fail_soft is set
2238        if failed and False:
2239            if not fail_soft:
2240                thread_pool.clear()
2241                for tb in succeeded:
2242                    # Create and start a thread to stop the segment
2243                    thread_pool.wait_for_slot()
2244                    t  = self.pooled_thread(\
2245                            target=self.stop_segment(log=log,
2246                                keyfile=self.ssh_privkey_file,
2247                                debug=self.debug), 
2248                            args=(tb, eid, tbparams), name=tb,
2249                            pdata=thread_pool, trace_file=self.trace_file)
2250                    t.start()
2251                # Wait until all finish
2252                thread_pool.wait_for_all_done()
2253
2254                # release the allocations
2255                for tb in tbparams.keys():
2256                    self.release_access(tb, tbparams[tb]['allocID'])
2257                # Remove the placeholder
2258                self.state_lock.acquire()
2259                self.state[eid]['experimentStatus'] = 'failed'
2260                if self.state_filename: self.write_state()
2261                self.state_lock.release()
2262
2263                log.error("Swap in failed on %s" % ",".join(failed))
2264                return
2265        else:
2266            log.info("[start_segment]: Experiment %s active" % eid)
2267
2268        log.debug("[start_experiment]: removing %s" % tmpdir)
2269
2270        # Walk up tmpdir, deleting as we go
2271        for path, dirs, files in os.walk(tmpdir, topdown=False):
2272            for f in files:
2273                os.remove(os.path.join(path, f))
2274            for d in dirs:
2275                os.rmdir(os.path.join(path, d))
2276        os.rmdir(tmpdir)
2277
2278        # Insert the experiment into our state and update the disk copy
2279        self.state_lock.acquire()
2280        self.state[expid]['experimentStatus'] = 'active'
2281        self.state[eid] = self.state[expid]
2282        if self.state_filename: self.write_state()
2283        self.state_lock.release()
2284        return
2285
2286
2287    def new_create_experiment(self, req, fid):
2288        """
2289        The external interface to experiment creation called from the
2290        dispatcher.
2291
2292        Creates a working directory, splits the incoming description using the
2293        splitter script and parses out the avrious subsections using the
2294        lcasses above.  Once each sub-experiment is created, use pooled threads
2295        to instantiate them and start it all up.
2296        """
2297
2298        def add_kit(e, kit):
2299            """
2300            Add a Software object created from the list of (install, location)
2301            tuples passed as kit  to the software attribute of an object e.  We
2302            do this enough to break out the code, but it's kind of a hack to
2303            avoid changing the old tuple rep.
2304            """
2305
2306            s = [ topdl.Software(install=i, location=l) for i, l in kit]
2307
2308            if isinstance(e.software, list): e.software.extend(s)
2309            else: e.software = s
2310
2311
2312        if not self.auth.check_attribute(fid, 'create'):
2313            raise service_error(service_error.access, "Create access denied")
2314
2315        try:
2316            tmpdir = tempfile.mkdtemp(prefix="split-")
2317        except IOError:
2318            raise service_error(service_error.internal, "Cannot create tmp dir")
2319
2320        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2321        gw_secretkey_base = "fed.%s" % self.ssh_type
2322        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2323        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2324        tclfile = tmpdir + "/experiment.tcl"
2325        tbparams = { }
2326        try:
2327            access_user = self.accessdb[fid]
2328        except KeyError:
2329            raise service_error(service_error.internal,
2330                    "Access map and authorizer out of sync in " + \
2331                            "create_experiment for fedid %s"  % fid)
2332
2333        pid = "dummy"
2334        gid = "dummy"
2335        try:
2336            os.mkdir(tmpdir+"/keys")
2337        except OSError:
2338            raise service_error(service_error.internal,
2339                    "Can't make temporary dir")
2340
2341        req = req.get('CreateRequestBody', None)
2342        if not req:
2343            raise service_error(service_error.req,
2344                    "Bad request format (no CreateRequestBody)")
2345        # The tcl parser needs to read a file so put the content into that file
2346        descr=req.get('experimentdescription', None)
2347        if descr:
2348            file_content=descr.get('ns2description', None)
2349            if file_content:
2350                try:
2351                    f = open(tclfile, 'w')
2352                    f.write(file_content)
2353                    f.close()
2354                except IOError:
2355                    raise service_error(service_error.internal,
2356                            "Cannot write temp experiment description")
2357            else:
2358                raise service_error(service_error.req, 
2359                        "Only ns2descriptions supported")
2360        else:
2361            raise service_error(service_error.req, "No experiment description")
2362
2363        # Generate an ID for the experiment (slice) and a certificate that the
2364        # allocator can use to prove they own it.  We'll ship it back through
2365        # the encrypted connection.
2366        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
2367
2368        if req.has_key('experimentID') and \
2369                req['experimentID'].has_key('localname'):
2370            overwrite = False
2371            eid = req['experimentID']['localname']
2372            # If there's an old failed experiment here with the same local name
2373            # and accessible by this user, we'll overwrite it, otherwise we'll
2374            # fall through and do the collision avoidance.
2375            old_expid = self.get_experiment_fedid(eid)
2376            if old_expid and self.check_experiment_access(fid, old_expid):
2377                self.state_lock.acquire()
2378                status = self.state[eid].get('experimentStatus', None)
2379                if status and status == 'failed':
2380                    # remove the old access attribute
2381                    self.auth.unset_attribute(fid, old_expid)
2382                    overwrite = True
2383                    del self.state[eid]
2384                    del self.state[old_expid]
2385                self.state_lock.release()
2386            self.state_lock.acquire()
2387            while (self.state.has_key(eid) and not overwrite):
2388                eid += random.choice(string.ascii_letters)
2389            # Initial state
2390            self.state[eid] = {
2391                    'experimentID' : \
2392                            [ { 'localname' : eid }, {'fedid': expid } ],
2393                    'experimentStatus': 'starting',
2394                    'experimentAccess': { 'X509' : expcert },
2395                    'owner': fid,
2396                    'log' : [],
2397                }
2398            self.state[expid] = self.state[eid]
2399            if self.state_filename: self.write_state()
2400            self.state_lock.release()
2401        else:
2402            eid = self.exp_stem
2403            for i in range(0,5):
2404                eid += random.choice(string.ascii_letters)
2405            self.state_lock.acquire()
2406            while (self.state.has_key(eid)):
2407                eid = self.exp_stem
2408                for i in range(0,5):
2409                    eid += random.choice(string.ascii_letters)
2410            # Initial state
2411            self.state[eid] = {
2412                    'experimentID' : \
2413                            [ { 'localname' : eid }, {'fedid': expid } ],
2414                    'experimentStatus': 'starting',
2415                    'experimentAccess': { 'X509' : expcert },
2416                    'owner': fid,
2417                    'log' : [],
2418                }
2419            self.state[expid] = self.state[eid]
2420            if self.state_filename: self.write_state()
2421            self.state_lock.release()
2422
2423        try: 
2424            # This catches exceptions to clear the placeholder if necessary
2425            try:
2426                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2427            except ValueError:
2428                raise service_error(service_error.server_config, 
2429                        "Bad key type (%s)" % self.ssh_type)
2430
2431            user = req.get('user', None)
2432            if user == None:
2433                raise service_error(service_error.req, "No user")
2434
2435            master = req.get('master', None)
2436            if not master:
2437                raise service_error(service_error.req,
2438                        "No master testbed label")
2439            export_project = req.get('exportProject', None)
2440            if not export_project:
2441                raise service_error(service_error.req, "No export project")
2442           
2443            if self.splitter_url:
2444                self.log.debug("Calling remote splitter at %s" % \
2445                        self.splitter_url)
2446                split_data = self.remote_splitter(self.splitter_url,
2447                        file_content, master)
2448            else:
2449                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2450                    str(self.muxmax), '-m', master]
2451
2452                if self.fedkit:
2453                    tclcmd.append('-k')
2454
2455                if self.gatewaykit:
2456                    tclcmd.append('-K')
2457
2458                tclcmd.extend([pid, gid, eid, tclfile])
2459
2460                self.log.debug("running local splitter %s", " ".join(tclcmd))
2461                # This is just fantastic.  As a side effect the parser copies
2462                # tb_compat.tcl into the current directory, so that directory
2463                # must be writable by the fedd user.  Doing this in the
2464                # temporary subdir ensures this is the case.
2465                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2466                        cwd=tmpdir)
2467                split_data = tclparser.stdout
2468
2469            allocated = { }         # Testbeds we can access
2470            # Allocate IP addresses: The allocator is a buddy system memory
2471            # allocator.  Allocate from the largest substrate to the
2472            # smallest to make the packing more likely to work - i.e.
2473            # avoiding internal fragmentation.
2474            top = topdl.topology_from_xml(file=split_data, top="experiment")
2475            subs = sorted(top.substrates, 
2476                    cmp=lambda x,y: cmp(len(x.interfaces), 
2477                        len(y.interfaces)),
2478                    reverse=True)
2479            ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
2480            ifs = { }
2481            hosts = [ ]
2482            # The config urlpath
2483            configpath = "/%s/config" % expid
2484            # The config file system location
2485            configdir ="%s%s" % ( self.repodir, configpath)
2486
2487            for idx, s in enumerate(subs):
2488                a = ips.allocate(len(s.interfaces)+2)
2489                if a :
2490                    base, num = a
2491                    if num < len(s.interfaces) +2 : 
2492                        raise service_error(service_error.internal,
2493                                "Allocator returned wrong number of IPs??")
2494                else:
2495                    raise service_error(service_error.req, 
2496                            "Cannot allocate IP addresses")
2497
2498                base += 1
2499                for i in s.interfaces:
2500                    i.attribute.append(
2501                            topdl.Attribute('ip4_address', 
2502                                "%s" % ip_addr(base)))
2503                    hname = i.element.name[0]
2504                    if ifs.has_key(hname):
2505                        hosts.append("%s\t%s-%s %s-%d" % \
2506                                (ip_addr(base), hname, s.name, hname,
2507                                    ifs[hname]))
2508                    else:
2509                        ifs[hname] = 0
2510                        hosts.append("%s\t%s-%s %s-%d %s" % \
2511                                (ip_addr(base), hname, s.name, hname,
2512                                    ifs[hname], hname))
2513
2514                    ifs[hname] += 1
2515                    base += 1
2516            # save config files
2517            try:
2518                os.makedirs(configdir)
2519            except IOError, e:
2520                raise service_error(
2521                        "Cannot create config directory: %s" % e)
2522            # Find the testbeds to look up
2523            testbeds = set([ a.value for e in top.elements \
2524                    for a in e.attribute \
2525                        if a.attribute == 'testbed'] )
2526
2527
2528            # Make per testbed topologies.  Copy the main topo and remove
2529            # interfaces and nodes that don't live in the testbed.
2530            topo ={ }
2531            for tb in testbeds:
2532                self.get_access(tb, None, user, tbparams, master,
2533                        export_project, access_user)
2534                allocated[tb] = 1
2535                topo[tb] = top.clone()
2536                to_delete = [ ]
2537                for e in topo[tb].elements:
2538                    etb = e.get_attribute('testbed')
2539                    if etb and etb != tb:
2540                        for i in e.interface:
2541                            for s in i.subs:
2542                                try:
2543                                    s.interfaces.remove(i)
2544                                except ValueError:
2545                                    raise service_error(service_error.internal,
2546                                            "Can't remove interface??")
2547                        to_delete.append(e)
2548                for e in to_delete:
2549                    topo[tb].elements.remove(e)
2550                topo[tb].make_indices()
2551
2552                for e in topo[tb].elements:
2553                    if tb == master:
2554                        cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
2555                    else:
2556                        cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
2557                    scmd = e.get_attribute('startup')
2558                    if scmd:
2559                        cmd = "%s \\$USER '%s'" % (cmd, scmd)
2560
2561                    e.set_attribute('startup', cmd)
2562                    if self.fedkit: add_kit(e, self.fedkit)
2563
2564            # Copy configuration files into the remote file store
2565            try:
2566                f = open("%s/hosts" % configdir, "w")
2567                f.write('\n'.join(hosts))
2568                f.close()
2569            except IOError, e:
2570                raise service_error(service_error.internal, 
2571                        "Cannot write hosts file: %s" % e)
2572            try:
2573                self.copy_file("%s" % gw_pubkey, "%s/%s" % \
2574                        (configdir, gw_pubkey_base))
2575                self.copy_file("%s" % gw_secretkey, "%s/%s" % \
2576                        (configdir, gw_secretkey_base))
2577            except IOError, e:
2578                raise service_error(service_error.internal, 
2579                        "Cannot copy keyfiles: %s" % e)
2580
2581            # Allow the individual testbeds to access the configuration files.
2582            for tb in tbparams.keys():
2583                asignee = tbparams[tb]['allocID']['fedid']
2584                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2585                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2586                    print "assigned %s/%s" % (configpath, f)
2587
2588            # Now, for each substrate in the main topology, find those that
2589            # have nodes on more than one testbed.  Insert portal nodes
2590            # into the copies of those substrates on the sub topologies.
2591            for s in top.substrates:
2592                # tbs will contain an ip address on this subsrate that is in
2593                # each testbed.
2594                tbs = { }
2595                for i in s.interfaces:
2596                    e = i.element
2597                    tb = e.get_attribute('testbed')
2598                    if tb and not tbs.has_key(tb):
2599                        for i in e.interface:
2600                            if s in i.subs:
2601                                tbs[tb]= i.get_attribute('ip4_address')
2602                if len(tbs) < 2:
2603                    continue
2604
2605                # More than one testbed is on this substrate.  Insert
2606                # some portals into the subtopologies.  st == source testbed,
2607                # dt == destination testbed.
2608                segment_substrate = { }
2609                for st in tbs.keys():
2610                    segment_substrate[st] = { }
2611                    for dt in [ t for t in tbs.keys() if t != st]:
2612                        myname =  "%stunnel" % dt
2613                        desthost  =  "%stunnel" % st
2614                        sproject = tbparams[st].get('project', 'project')
2615                        dproject = tbparams[dt].get('project', 'project')
2616                        mproject = tbparams[master].get('project', 'project')
2617                        sdomain = tbparams[st].get('domain', ".example.com")
2618                        ddomain = tbparams[dt].get('domain', ".example.com")
2619                        mdomain = tbparams[master].get('domain', '.example.com')
2620                        # XXX: active and type need to be unkludged
2621                        active = ("%s" % (st == master))
2622                        if not segment_substrate[st].has_key(dt):
2623                            # Put a substrate and a segment for the connected
2624                            # testbed in there.
2625                            tsubstrate = \
2626                                    topdl.Substrate(name='%s-%s' % (st, dt),
2627                                            attribute= [
2628                                                topdl.Attribute(
2629                                                    attribute='portal',
2630                                                    value='true')
2631                                                ]
2632                                            )
2633                            segment_element = topdl.Segment(
2634                                    id= tbparams[dt]['allocID'],
2635                                    type='emulab',
2636                                    uri = self.tbmap.get(dt, None),
2637                                    interface=[ 
2638                                        topdl.Interface(
2639                                            substrate=tsubstrate.name),
2640                                        ],
2641                                    attribute = [
2642                                        topdl.Attribute(attribute=n, value=v)
2643                                            for n, v in (\
2644                                                ('domain', ddomain),
2645                                                ('experiment', "%s/%s" % \
2646                                                        (dproject, eid)),)
2647                                        ],
2648                                    )
2649                            segment_substrate[st][dt] = tsubstrate
2650                            topo[st].substrates.append(tsubstrate)
2651                            topo[st].elements.append(segment_element)
2652                        portal = topdl.Computer(
2653                                name="%stunnel" % dt,
2654                                attribute=[ 
2655                                    topdl.Attribute(attribute=n,value=v)
2656                                        for n, v in (\
2657                                            ('portal', 'true'),
2658                                            ('domain', sdomain),
2659                                            ('masterdomain', mdomain),
2660                                            ('masterexperiment', "%s/%s" % \
2661                                                    (mproject, eid)),
2662                                            ('experiment', "%s/%s" % \
2663                                                    (sproject, eid)),
2664                                            ('peer', "%s" % desthost),
2665                                            ('peer_segment', "%s" % \
2666                                                    tbparams[dt]['allocID']['fedid']),
2667                                            ('scriptdir', 
2668                                                "/usr/local/federation/bin"),
2669                                            ('active', "%s" % active),
2670                                            ('type', 'both'), 
2671                                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), sdomain.lower())))
2672                                    ],
2673                                interface=[
2674                                    topdl.Interface(
2675                                        substrate=s.name,
2676                                        attribute=[ 
2677                                            topdl.Attribute(
2678                                                attribute='ip4_addreess', 
2679                                                value=tbs[dt]
2680                                            )
2681                                        ]),
2682                                    topdl.Interface(
2683                                        substrate=\
2684                                            segment_substrate[st][dt].name,
2685                                        attribute=[
2686                                            topdl.Attribute(attribute='portal',
2687                                                value='true')
2688                                            ]
2689                                        ),
2690                                    ],
2691                                )
2692                        if self.fedkit: add_kit(portal, self.fedkit)
2693                        if self.gatewaykit: add_kit(portal, self.gatewaykit)
2694
2695                        topo[st].elements.append(portal)
2696
2697            # Connect the gateway nodes into the topologies and clear out
2698            # substrates that are not in the topologies
2699            for tb in testbeds:
2700                topo[tb].incorporate_elements()
2701                topo[tb].substrates = \
2702                        [s for s in topo[tb].substrates \
2703                            if len(s.interfaces) >0]
2704
2705            # Copy the rpms and tarfiles to a distribution directory from
2706            # which the federants can retrieve them
2707            linkpath = "%s/software" %  expid
2708            softdir ="%s/%s" % ( self.repodir, linkpath)
2709            softmap = { }
2710            pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
2711                    for p, t in l ])
2712            pkgs.update([x.location for e in top.elements \
2713                    for x in e.software])
2714            try:
2715                os.makedirs(softdir)
2716            except IOError, e:
2717                raise service_error(
2718                        "Cannot create software directory: %s" % e)
2719            for pkg in pkgs:
2720                loc = pkg
2721
2722                scheme, host, path = urlparse(loc)[0:3]
2723                dest = os.path.basename(path)
2724                if not scheme:
2725                    if not loc.startswith('/'):
2726                        loc = "/%s" % loc
2727                    loc = "file://%s" %loc
2728                try:
2729                    u = urlopen(loc)
2730                except Exception, e:
2731                    raise service_error(service_error.req, 
2732                            "Cannot open %s: %s" % (loc, e))
2733                try:
2734                    f = open("%s/%s" % (softdir, dest) , "w")
2735                    self.log.debug("Writing %s/%s" % (softdir,dest) )
2736                    data = u.read(4096)
2737                    while data:
2738                        f.write(data)
2739                        data = u.read(4096)
2740                    f.close()
2741                    u.close()
2742                except Exception, e:
2743                    raise service_error(service_error.internal,
2744                            "Could not copy %s: %s" % (loc, e))
2745                path = re.sub("/tmp", "", linkpath)
2746                # XXX
2747                softmap[pkg] = \
2748                        "https://users.isi.deterlab.net:23232/%s/%s" %\
2749                        ( path, dest)
2750
2751                # Allow the individual testbeds to access the software.
2752                for tb in tbparams.keys():
2753                    self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
2754                            "/%s/%s" % ( path, dest))
2755
2756            # Convert the software locations in the segments into the local
2757            # copies on this host
2758            for soft in [ s for tb in topo.values() \
2759                    for e in tb.elements \
2760                        if getattr(e, 'software', False) \
2761                            for s in e.software ]:
2762                if softmap.has_key(soft.location):
2763                    soft.location = softmap[soft.location]
2764
2765            vtopo = topdl.topology_to_vtopo(top)
2766            vis = self.genviz(vtopo)
2767
2768            # save federant information
2769            for k in allocated.keys():
2770                tbparams[k]['federant'] = {\
2771                        'name': [ { 'localname' : eid} ],\
2772                        'emulab': tbparams[k]['emulab'],\
2773                        'allocID' : tbparams[k]['allocID'],\
2774                        'master' : k == master,\
2775                    }
2776
2777            self.state_lock.acquire()
2778            self.state[eid]['vtopo'] = vtopo
2779            self.state[eid]['vis'] = vis
2780            self.state[expid]['federant'] = \
2781                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2782                        if tbparams[tb].has_key('federant') ]
2783            if self.state_filename: 
2784                self.write_state()
2785            self.state_lock.release()
2786        except service_error, e:
2787            # If something goes wrong in the parse (usually an access error)
2788            # clear the placeholder state.  From here on out the code delays
2789            # exceptions.  Failing at this point returns a fault to the remote
2790            # caller.
2791
2792            self.state_lock.acquire()
2793            del self.state[eid]
2794            del self.state[expid]
2795            if self.state_filename: self.write_state()
2796            self.state_lock.release()
2797            raise e
2798
2799
2800        # Start the background swapper and return the starting state.  From
2801        # here on out, the state will stick around a while.
2802
2803        # Let users touch the state
2804        self.auth.set_attribute(fid, expid)
2805        self.auth.set_attribute(expid, expid)
2806        # Override fedids can manipulate state as well
2807        for o in self.overrides:
2808            self.auth.set_attribute(o, expid)
2809
2810        # Create a logger that logs to the experiment's state object as well as
2811        # to the main log file.
2812        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2813        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2814        # XXX: there should be a global one of these rather than repeating the
2815        # code.
2816        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2817                    '%d %b %y %H:%M:%S'))
2818        alloc_log.addHandler(h)
2819       
2820        # XXX
2821        url_base = 'https://users.isi.deterlab.net:23232'
2822        attrs = [ 
2823                {
2824                    'attribute': 'ssh_pubkey', 
2825                    'value': '%s/%s/config/%s' % \
2826                            (url_base, expid, gw_pubkey_base)
2827                },
2828                {
2829                    'attribute': 'ssh_secretkey', 
2830                    'value': '%s/%s/config/%s' % \
2831                            (url_base, expid, gw_secretkey_base)
2832                },
2833                {
2834                    'attribute': 'hosts', 
2835                    'value': '%s/%s/config/hosts' % \
2836                            (url_base, expid)
2837                },
2838            ]
2839
2840        # Start a thread to do the resource allocation
2841        t  = Thread(target=self.new_allocate_resources,
2842                args=(allocated, master, eid, expid, expcert, tbparams, 
2843                    topo, tmpdir, alloc_log, attrs),
2844                name=eid)
2845        t.start()
2846
2847        rv = {
2848                'experimentID': [
2849                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2850                ],
2851                'experimentStatus': 'starting',
2852                'experimentAccess': { 'X509' : expcert }
2853            }
2854
2855        return rv
2856   
2857    def get_experiment_fedid(self, key):
2858        """
2859        find the fedid associated with the localname key in the state database.
2860        """
2861
2862        rv = None
2863        self.state_lock.acquire()
2864        if self.state.has_key(key):
2865            if isinstance(self.state[key], dict):
2866                try:
2867                    kl = [ f['fedid'] for f in \
2868                            self.state[key]['experimentID']\
2869                                if f.has_key('fedid') ]
2870                except KeyError:
2871                    self.state_lock.release()
2872                    raise service_error(service_error.internal, 
2873                            "No fedid for experiment %s when getting "+\
2874                                    "fedid(!?)" % key)
2875                if len(kl) == 1:
2876                    rv = kl[0]
2877                else:
2878                    self.state_lock.release()
2879                    raise service_error(service_error.internal, 
2880                            "multiple fedids for experiment %s when " +\
2881                                    "getting fedid(!?)" % key)
2882            else:
2883                self.state_lock.release()
2884                raise service_error(service_error.internal, 
2885                        "Unexpected state for %s" % key)
2886        self.state_lock.release()
2887        return rv
2888
2889    def check_experiment_access(self, fid, key):
2890        """
2891        Confirm that the fid has access to the experiment.  Though a request
2892        may be made in terms of a local name, the access attribute is always
2893        the experiment's fedid.
2894        """
2895        if not isinstance(key, fedid):
2896            key = self.get_experiment_fedid(key)
2897
2898        if self.auth.check_attribute(fid, key):
2899            return True
2900        else:
2901            raise service_error(service_error.access, "Access Denied")
2902
2903
2904    def get_handler(self, path, fid):
2905        print "%s" %  path
2906        if self.auth.check_attribute(fid, path):
2907            return ("%s/%s" % (self.repodir, path), "application/binary")
2908        else:
2909            return (None, None)
2910
2911    def get_vtopo(self, req, fid):
2912        """
2913        Return the stored virtual topology for this experiment
2914        """
2915        rv = None
2916        state = None
2917
2918        req = req.get('VtopoRequestBody', None)
2919        if not req:
2920            raise service_error(service_error.req,
2921                    "Bad request format (no VtopoRequestBody)")
2922        exp = req.get('experiment', None)
2923        if exp:
2924            if exp.has_key('fedid'):
2925                key = exp['fedid']
2926                keytype = "fedid"
2927            elif exp.has_key('localname'):
2928                key = exp['localname']
2929                keytype = "localname"
2930            else:
2931                raise service_error(service_error.req, "Unknown lookup type")
2932        else:
2933            raise service_error(service_error.req, "No request?")
2934
2935        self.check_experiment_access(fid, key)
2936
2937        self.state_lock.acquire()
2938        if self.state.has_key(key):
2939            if self.state[key].has_key('vtopo'):
2940                rv = { 'experiment' : {keytype: key },\
2941                        'vtopo': self.state[key]['vtopo'],\
2942                    }
2943            else:
2944                state = self.state[key]['experimentStatus']
2945        self.state_lock.release()
2946
2947        if rv: return rv
2948        else: 
2949            if state:
2950                raise service_error(service_error.partial, 
2951                        "Not ready: %s" % state)
2952            else:
2953                raise service_error(service_error.req, "No such experiment")
2954
2955    def get_vis(self, req, fid):
2956        """
2957        Return the stored visualization for this experiment
2958        """
2959        rv = None
2960        state = None
2961
2962        req = req.get('VisRequestBody', None)
2963        if not req:
2964            raise service_error(service_error.req,
2965                    "Bad request format (no VisRequestBody)")
2966        exp = req.get('experiment', None)
2967        if exp:
2968            if exp.has_key('fedid'):
2969                key = exp['fedid']
2970                keytype = "fedid"
2971            elif exp.has_key('localname'):
2972                key = exp['localname']
2973                keytype = "localname"
2974            else:
2975                raise service_error(service_error.req, "Unknown lookup type")
2976        else:
2977            raise service_error(service_error.req, "No request?")
2978
2979        self.check_experiment_access(fid, key)
2980
2981        self.state_lock.acquire()
2982        if self.state.has_key(key):
2983            if self.state[key].has_key('vis'):
2984                rv =  { 'experiment' : {keytype: key },\
2985                        'vis': self.state[key]['vis'],\
2986                        }
2987            else:
2988                state = self.state[key]['experimentStatus']
2989        self.state_lock.release()
2990
2991        if rv: return rv
2992        else:
2993            if state:
2994                raise service_error(service_error.partial, 
2995                        "Not ready: %s" % state)
2996            else:
2997                raise service_error(service_error.req, "No such experiment")
2998
2999    def clean_info_response(self, rv):
3000        """
3001        Remove the information in the experiment's state object that is not in
3002        the info response.
3003        """
3004        # Remove the owner info (should always be there, but...)
3005        if rv.has_key('owner'): del rv['owner']
3006
3007        # Convert the log into the allocationLog parameter and remove the
3008        # log entry (with defensive programming)
3009        if rv.has_key('log'):
3010            rv['allocationLog'] = "".join(rv['log'])
3011            del rv['log']
3012        else:
3013            rv['allocationLog'] = ""
3014
3015        if rv['experimentStatus'] != 'active':
3016            if rv.has_key('federant'): del rv['federant']
3017        else:
3018            # remove the allocationID info from each federant
3019            for f in rv.get('federant', []):
3020                if f.has_key('allocID'): del f['allocID']
3021        return rv
3022
3023    def get_info(self, req, fid):
3024        """
3025        Return all the stored info about this experiment
3026        """
3027        rv = None
3028
3029        req = req.get('InfoRequestBody', None)
3030        if not req:
3031            raise service_error(service_error.req,
3032                    "Bad request format (no InfoRequestBody)")
3033        exp = req.get('experiment', None)
3034        if exp:
3035            if exp.has_key('fedid'):
3036                key = exp['fedid']
3037                keytype = "fedid"
3038            elif exp.has_key('localname'):
3039                key = exp['localname']
3040                keytype = "localname"
3041            else:
3042                raise service_error(service_error.req, "Unknown lookup type")
3043        else:
3044            raise service_error(service_error.req, "No request?")
3045
3046        self.check_experiment_access(fid, key)
3047
3048        # The state may be massaged by the service function that called
3049        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
3050        # state.
3051        self.state_lock.acquire()
3052        if self.state.has_key(key):
3053            rv = copy.deepcopy(self.state[key])
3054        self.state_lock.release()
3055
3056        if rv:
3057            return self.clean_info_response(rv)
3058        else:
3059            raise service_error(service_error.req, "No such experiment")
3060
3061    def get_multi_info(self, req, fid):
3062        """
3063        Return all the stored info that this fedid can access
3064        """
3065        rv = { 'info': [ ] }
3066
3067        self.state_lock.acquire()
3068        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
3069            self.check_experiment_access(fid, key)
3070
3071            if self.state.has_key(key):
3072                e = copy.deepcopy(self.state[key])
3073                e = self.clean_info_response(e)
3074                rv['info'].append(e)
3075        self.state_lock.release()
3076        return rv
3077
3078
3079    def terminate_experiment(self, req, fid):
3080        """
3081        Swap this experiment out on the federants and delete the shared
3082        information
3083        """
3084        tbparams = { }
3085        req = req.get('TerminateRequestBody', None)
3086        if not req:
3087            raise service_error(service_error.req,
3088                    "Bad request format (no TerminateRequestBody)")
3089        force = req.get('force', False)
3090        exp = req.get('experiment', None)
3091        if exp:
3092            if exp.has_key('fedid'):
3093                key = exp['fedid']
3094                keytype = "fedid"
3095            elif exp.has_key('localname'):
3096                key = exp['localname']
3097                keytype = "localname"
3098            else:
3099                raise service_error(service_error.req, "Unknown lookup type")
3100        else:
3101            raise service_error(service_error.req, "No request?")
3102
3103        self.check_experiment_access(fid, key)
3104
3105        dealloc_list = [ ]
3106
3107
3108        # Create a logger that logs to the dealloc_list as well as to the main
3109        # log file.
3110        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
3111        h = logging.StreamHandler(self.list_log(dealloc_list))
3112        # XXX: there should be a global one of these rather than repeating the
3113        # code.
3114        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
3115                    '%d %b %y %H:%M:%S'))
3116        dealloc_log.addHandler(h)
3117
3118        self.state_lock.acquire()
3119        fed_exp = self.state.get(key, None)
3120
3121        if fed_exp:
3122            # This branch of the conditional holds the lock to generate a
3123            # consistent temporary tbparams variable to deallocate experiments.
3124            # It releases the lock to do the deallocations and reacquires it to
3125            # remove the experiment state when the termination is complete.
3126
3127            # First make sure that the experiment creation is complete.
3128            status = fed_exp.get('experimentStatus', None)
3129
3130            if status:
3131                if status in ('starting', 'terminating'):
3132                    if not force:
3133                        self.state_lock.release()
3134                        raise service_error(service_error.partial, 
3135                                'Experiment still being created or destroyed')
3136                    else:
3137                        self.log.warning('Experiment in %s state ' % status + \
3138                                'being terminated by force.')
3139            else:
3140                # No status??? trouble
3141                self.state_lock.release()
3142                raise service_error(service_error.internal,
3143                        "Experiment has no status!?")
3144
3145            ids = []
3146            #  experimentID is a list of dicts that are self-describing
3147            #  identifiers.  This finds all the fedids and localnames - the
3148            #  keys of self.state - and puts them into ids.
3149            for id in fed_exp.get('experimentID', []):
3150                if id.has_key('fedid'): ids.append(id['fedid'])
3151                if id.has_key('localname'): ids.append(id['localname'])
3152
3153            # Construct enough of the tbparams to make the stop_segment calls
3154            # work
3155            for fed in fed_exp.get('federant', []):
3156                try:
3157                    for e in fed['name']:
3158                        eid = e.get('localname', None)
3159                        if eid: break
3160                    else:
3161                        continue
3162
3163                    p = fed['emulab']['project']
3164
3165                    project = p['name']['localname']
3166                    tb = p['testbed']['localname']
3167                    user = p['user'][0]['userID']['localname']
3168
3169                    domain = fed['emulab']['domain']
3170                    host  = fed['emulab']['ops']
3171                    aid = fed['allocID']
3172                except KeyError, e:
3173                    continue
3174                tbparams[tb] = {\
3175                        'user': user,\
3176                        'domain': domain,\
3177                        'project': project,\
3178                        'host': host,\
3179                        'eid': eid,\
3180                        'aid': aid,\
3181                    }
3182            fed_exp['experimentStatus'] = 'terminating'
3183            if self.state_filename: self.write_state()
3184            self.state_lock.release()
3185
3186            # Stop everyone.  NB, wait_for_all waits until a thread starts and
3187            # then completes, so we can't wait if nothing starts.  So, no
3188            # tbparams, no start.
3189            if len(tbparams) > 0:
3190                thread_pool = self.thread_pool(self.nthreads)
3191                for tb in tbparams.keys():
3192                    # Create and start a thread to stop the segment
3193                    thread_pool.wait_for_slot()
3194                    t  = self.pooled_thread(\
3195                            target=self.stop_segment(log=dealloc_log,
3196                                keyfile=self.ssh_privkey_file, debug=self.debug), 
3197                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
3198                            pdata=thread_pool, trace_file=self.trace_file)
3199                    t.start()
3200                # Wait for completions
3201                thread_pool.wait_for_all_done()
3202
3203            # release the allocations (failed experiments have done this
3204            # already, and starting experiments may be in odd states, so we
3205            # ignore errors releasing those allocations
3206            try: 
3207                for tb in tbparams.keys():
3208                    self.release_access(tb, tbparams[tb]['aid'])
3209            except service_error, e:
3210                if status != 'failed' and not force:
3211                    raise e
3212
3213            # Remove the terminated experiment
3214            self.state_lock.acquire()
3215            for id in ids:
3216                if self.state.has_key(id): del self.state[id]
3217
3218            if self.state_filename: self.write_state()
3219            self.state_lock.release()
3220
3221            return { 
3222                    'experiment': exp , 
3223                    'deallocationLog': "".join(dealloc_list),
3224                    }
3225        else:
3226            # Don't forget to release the lock
3227            self.state_lock.release()
3228            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.