source: fedd/federation/experiment_control.py @ 6c57fe9

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 6c57fe9 was 6c57fe9, checked in by Ted Faber <faber@…>, 15 years ago

checkpoint

  • Property mode set to 100644
File size: 123.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32from ip_allocator import ip_allocator
33from ip_addr import ip_addr
34
35
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.experiment_control")
40fl.addHandler(nullHandler())
41
42class experiment_control_local:
43    """
44    Control of experiments that this system can directly access.
45
46    Includes experiment creation, termination and information dissemination.
47    Thred safe.
48    """
49
50    class ssh_cmd_timeout(RuntimeError): pass
51
52    class list_log:
53        """
54        Provide an interface that lets logger.StreamHandler s write to a list
55        of strings.
56        """
57        def __init__(self, l=[]):
58            """
59            Link to an existing list or just create a log
60            """
61            self.ll = l
62            self.lock = Lock()
63        def write(self, str):
64            """
65            Add the string to the log.  Lock for consistency.
66            """
67            self.lock.acquire()
68            self.ll.append(str)
69            self.lock.release()
70
71        def flush(self):
72            """
73            No-op that StreamHandlers expect
74            """
75            pass
76
77   
78    class thread_pool:
79        """
80        A class to keep track of a set of threads all invoked for the same
81        task.  Manages the mutual exclusion of the states.
82        """
83        def __init__(self, nthreads):
84            """
85            Start a pool.
86            """
87            self.changed = Condition()
88            self.started = 0
89            self.terminated = 0
90            self.nthreads = nthreads
91
92        def acquire(self):
93            """
94            Get the pool's lock.
95            """
96            self.changed.acquire()
97
98        def release(self):
99            """
100            Release the pool's lock.
101            """
102            self.changed.release()
103
104        def wait(self, timeout = None):
105            """
106            Wait for a pool thread to start or stop.
107            """
108            self.changed.wait(timeout)
109
110        def start(self):
111            """
112            Called by a pool thread to report starting.
113            """
114            self.changed.acquire()
115            self.started += 1
116            self.changed.notifyAll()
117            self.changed.release()
118
119        def terminate(self):
120            """
121            Called by a pool thread to report finishing.
122            """
123            self.changed.acquire()
124            self.terminated += 1
125            self.changed.notifyAll()
126            self.changed.release()
127
128        def clear(self):
129            """
130            Clear all pool data.
131            """
132            self.changed.acquire()
133            self.started = 0
134            self.terminated =0
135            self.changed.notifyAll()
136            self.changed.release()
137
138        def wait_for_slot(self):
139            """
140            Wait until we have a free slot to start another pooled thread
141            """
142            self.acquire()
143            while self.started - self.terminated >= self.nthreads:
144                self.wait()
145            self.release()
146
147        def wait_for_all_done(self):
148            """
149            Wait until all active threads finish (and at least one has started)
150            """
151            self.acquire()
152            while self.started == 0 or self.started > self.terminated:
153                self.wait()
154            self.release()
155
156    class pooled_thread(Thread):
157        """
158        One of a set of threads dedicated to a specific task.  Uses the
159        thread_pool class above for coordination.
160        """
161        def __init__(self, group=None, target=None, name=None, args=(), 
162                kwargs={}, pdata=None, trace_file=None):
163            Thread.__init__(self, group, target, name, args, kwargs)
164            self.rv = None          # Return value of the ops in this thread
165            self.exception = None   # Exception that terminated this thread
166            self.target=target      # Target function to run on start()
167            self.args = args        # Args to pass to target
168            self.kwargs = kwargs    # Additional kw args
169            self.pdata = pdata      # thread_pool for this class
170            # Logger for this thread
171            self.log = logging.getLogger("fedd.experiment_control")
172       
173        def run(self):
174            """
175            Emulate Thread.run, except add pool data manipulation and error
176            logging.
177            """
178            if self.pdata:
179                self.pdata.start()
180
181            if self.target:
182                try:
183                    self.rv = self.target(*self.args, **self.kwargs)
184                except service_error, s:
185                    self.exception = s
186                    self.log.error("Thread exception: %s %s" % \
187                            (s.code_string(), s.desc))
188                except:
189                    self.exception = sys.exc_info()[1]
190                    self.log.error(("Unexpected thread exception: %s" +\
191                            "Trace %s") % (self.exception,\
192                                traceback.format_exc()))
193            if self.pdata:
194                self.pdata.terminate()
195
196    call_RequestAccess = service_caller('RequestAccess')
197    call_ReleaseAccess = service_caller('ReleaseAccess')
198    call_StartSegment = service_caller('StartSegment')
199    call_Ns2Split = service_caller('Ns2Split')
200
201    def __init__(self, config=None, auth=None):
202        """
203        Intialize the various attributes, most from the config object
204        """
205
206        def parse_tarfile_list(tf):
207            """
208            Parse a tarfile list from the configuration.  This is a set of
209            paths and tarfiles separated by spaces.
210            """
211            rv = [ ]
212            if tf is not None:
213                tl = tf.split()
214                while len(tl) > 1:
215                    p, t = tl[0:2]
216                    del tl[0:2]
217                    rv.append((p, t))
218            return rv
219
220        self.thread_with_rv = experiment_control_local.pooled_thread
221        self.thread_pool = experiment_control_local.thread_pool
222        self.list_log = experiment_control_local.list_log
223
224        self.cert_file = config.get("experiment_control", "cert_file")
225        if self.cert_file:
226            self.cert_pwd = config.get("experiment_control", "cert_pwd")
227        else:
228            self.cert_file = config.get("globals", "cert_file")
229            self.cert_pwd = config.get("globals", "cert_pwd")
230
231        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
232                or config.get("globals", "trusted_certs")
233
234        self.repodir = config.get("experiment_control", "repodir")
235
236        self.exp_stem = "fed-stem"
237        self.log = logging.getLogger("fedd.experiment_control")
238        set_log_level(config, "experiment_control", self.log)
239        self.muxmax = 2
240        self.nthreads = 2
241        self.randomize_experiments = False
242
243        self.splitter = None
244        self.ssh_keygen = "/usr/bin/ssh-keygen"
245        self.ssh_identity_file = None
246
247
248        self.debug = config.getboolean("experiment_control", "create_debug")
249        self.state_filename = config.get("experiment_control", 
250                "experiment_state")
251        self.splitter_url = config.get("experiment_control", "splitter_uri")
252        self.fedkit = parse_tarfile_list(\
253                config.get("experiment_control", "fedkit"))
254        self.gatewaykit = parse_tarfile_list(\
255                config.get("experiment_control", "gatewaykit"))
256        accessdb_file = config.get("experiment_control", "accessdb")
257
258        self.ssh_pubkey_file = config.get("experiment_control", 
259                "ssh_pubkey_file")
260        self.ssh_privkey_file = config.get("experiment_control",
261                "ssh_privkey_file")
262        # NB for internal master/slave ops, not experiment setup
263        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
264
265        self.overrides = set([])
266        ovr = config.get('experiment_control', 'overrides')
267        if ovr:
268            for o in ovr.split(","):
269                o = o.strip()
270                if o.startswith('fedid:'): o = o[len('fedid:'):]
271                self.overrides.add(fedid(hexstr=o))
272
273        self.state = { }
274        self.state_lock = Lock()
275        self.tclsh = "/usr/local/bin/otclsh"
276        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
277                config.get("experiment_control", "tcl_splitter",
278                        "/usr/testbed/lib/ns2ir/parse.tcl")
279        mapdb_file = config.get("experiment_control", "mapdb")
280        self.trace_file = sys.stderr
281
282        self.def_expstart = \
283                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
284                "/tmp/federate";
285        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
286                "FEDDIR/hosts";
287        self.def_gwstart = \
288                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
289                "/tmp/bridge.log";
290        self.def_mgwstart = \
291                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
292                "/tmp/bridge.log";
293        self.def_gwimage = "FBSD61-TUNNEL2";
294        self.def_gwtype = "pc";
295        self.local_access = { }
296
297        if auth:
298            self.auth = auth
299        else:
300            self.log.error(\
301                    "[access]: No authorizer initialized, creating local one.")
302            auth = authorizer()
303
304
305        if self.ssh_pubkey_file:
306            try:
307                f = open(self.ssh_pubkey_file, 'r')
308                self.ssh_pubkey = f.read()
309                f.close()
310            except IOError:
311                raise service_error(service_error.internal,
312                        "Cannot read sshpubkey")
313        else:
314            raise service_error(service_error.internal, 
315                    "No SSH public key file?")
316
317        if not self.ssh_privkey_file:
318            raise service_error(service_error.internal, 
319                    "No SSH public key file?")
320
321
322        if mapdb_file:
323            self.read_mapdb(mapdb_file)
324        else:
325            self.log.warn("[experiment_control] No testbed map, using defaults")
326            self.tbmap = { 
327                    'deter':'https://users.isi.deterlab.net:23235',
328                    'emulab':'https://users.isi.deterlab.net:23236',
329                    'ucb':'https://users.isi.deterlab.net:23237',
330                    }
331
332        if accessdb_file:
333                self.read_accessdb(accessdb_file)
334        else:
335            raise service_error(service_error.internal,
336                    "No accessdb specified in config")
337
338        # Grab saved state.  OK to do this w/o locking because it's read only
339        # and only one thread should be in existence that can see self.state at
340        # this point.
341        if self.state_filename:
342            self.read_state()
343
344        # Dispatch tables
345        self.soap_services = {\
346                'Create': soap_handler('Create', self.new_create_experiment),
347                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
348                'Vis': soap_handler('Vis', self.get_vis),
349                'Info': soap_handler('Info', self.get_info),
350                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
351                'Terminate': soap_handler('Terminate', 
352                    self.terminate_experiment),
353        }
354
355        self.xmlrpc_services = {\
356                'Create': xmlrpc_handler('Create', self.new_create_experiment),
357                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
358                'Vis': xmlrpc_handler('Vis', self.get_vis),
359                'Info': xmlrpc_handler('Info', self.get_info),
360                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
361                'Terminate': xmlrpc_handler('Terminate',
362                    self.terminate_experiment),
363        }
364
365    def copy_file(self, src, dest, size=1024):
366        """
367        Exceedingly simple file copy.
368        """
369        s = open(src,'r')
370        d = open(dest, 'w')
371
372        buf = "x"
373        while buf != "":
374            buf = s.read(size)
375            d.write(buf)
376        s.close()
377        d.close()
378
379    # Call while holding self.state_lock
380    def write_state(self):
381        """
382        Write a new copy of experiment state after copying the existing state
383        to a backup.
384
385        State format is a simple pickling of the state dictionary.
386        """
387        if os.access(self.state_filename, os.W_OK):
388            self.copy_file(self.state_filename, \
389                    "%s.bak" % self.state_filename)
390        try:
391            f = open(self.state_filename, 'w')
392            pickle.dump(self.state, f)
393        except IOError, e:
394            self.log.error("Can't write file %s: %s" % \
395                    (self.state_filename, e))
396        except pickle.PicklingError, e:
397            self.log.error("Pickling problem: %s" % e)
398        except TypeError, e:
399            self.log.error("Pickling problem (TypeError): %s" % e)
400
401    # Call while holding self.state_lock
402    def read_state(self):
403        """
404        Read a new copy of experiment state.  Old state is overwritten.
405
406        State format is a simple pickling of the state dictionary.
407        """
408       
409        def get_experiment_id(state):
410            """
411            Pull the fedid experimentID out of the saved state.  This is kind
412            of a gross walk through the dict.
413            """
414
415            if state.has_key('experimentID'):
416                for e in state['experimentID']:
417                    if e.has_key('fedid'):
418                        return e['fedid']
419                else:
420                    return None
421            else:
422                return None
423
424        def get_alloc_ids(state):
425            """
426            Pull the fedids of the identifiers of each allocation from the
427            state.  Again, a dict dive that's best isolated.
428            """
429
430            return [ f['allocID']['fedid'] 
431                    for f in state.get('federant',[]) \
432                        if f.has_key('allocID') and \
433                            f['allocID'].has_key('fedid')]
434
435
436        try:
437            f = open(self.state_filename, "r")
438            self.state = pickle.load(f)
439            self.log.debug("[read_state]: Read state from %s" % \
440                    self.state_filename)
441        except IOError, e:
442            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
443                    % (self.state_filename, e))
444        except pickle.UnpicklingError, e:
445            self.log.warning(("[read_state]: No saved state: " + \
446                    "Unpickling failed: %s") % e)
447       
448        for s in self.state.values():
449            try:
450
451                eid = get_experiment_id(s)
452                if eid : 
453                    # Give the owner rights to the experiment
454                    self.auth.set_attribute(s['owner'], eid)
455                    # And holders of the eid as well
456                    self.auth.set_attribute(eid, eid)
457                    # allow overrides to control experiments as well
458                    for o in self.overrides:
459                        self.auth.set_attribute(o, eid)
460                    # Set permissions to allow reading of the software repo, if
461                    # any, as well.
462                    for a in get_alloc_ids(s):
463                        self.auth.set_attribute(a, 'repo/%s' % eid)
464                else:
465                    raise KeyError("No experiment id")
466            except KeyError, e:
467                self.log.warning("[read_state]: State ownership or identity " +\
468                        "misformatted in %s: %s" % (self.state_filename, e))
469
470
471    def read_accessdb(self, accessdb_file):
472        """
473        Read the mapping from fedids that can create experiments to their name
474        in the 3-level access namespace.  All will be asserted from this
475        testbed and can include the local username and porject that will be
476        asserted on their behalf by this fedd.  Each fedid is also added to the
477        authorization system with the "create" attribute.
478        """
479        self.accessdb = {}
480        # These are the regexps for parsing the db
481        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
482        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
483                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
484        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
485                "\s*->\s*(" + name_expr + ")\s*$")
486        lineno = 0
487
488        # Parse the mappings and store in self.authdb, a dict of
489        # fedid -> (proj, user)
490        try:
491            f = open(accessdb_file, "r")
492            for line in f:
493                lineno += 1
494                line = line.strip()
495                if len(line) == 0 or line.startswith('#'):
496                    continue
497                m = project_line.match(line)
498                if m:
499                    fid = fedid(hexstr=m.group(1))
500                    project, user = m.group(2,3)
501                    if not self.accessdb.has_key(fid):
502                        self.accessdb[fid] = []
503                    self.accessdb[fid].append((project, user))
504                    continue
505
506                m = user_line.match(line)
507                if m:
508                    fid = fedid(hexstr=m.group(1))
509                    project = None
510                    user = m.group(2)
511                    if not self.accessdb.has_key(fid):
512                        self.accessdb[fid] = []
513                    self.accessdb[fid].append((project, user))
514                    continue
515                self.log.warn("[experiment_control] Error parsing access " +\
516                        "db %s at line %d" %  (accessdb_file, lineno))
517        except IOError:
518            raise service_error(service_error.internal,
519                    "Error opening/reading %s as experiment " +\
520                            "control accessdb" %  accessdb_file)
521        f.close()
522
523        # Initialize the authorization attributes
524        for fid in self.accessdb.keys():
525            self.auth.set_attribute(fid, 'create')
526
527    def read_mapdb(self, file):
528        """
529        Read a simple colon separated list of mappings for the
530        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
531        """
532
533        self.tbmap = { }
534        lineno =0
535        try:
536            f = open(file, "r")
537            for line in f:
538                lineno += 1
539                line = line.strip()
540                if line.startswith('#') or len(line) == 0:
541                    continue
542                try:
543                    label, url = line.split(':', 1)
544                    self.tbmap[label] = url
545                except ValueError, e:
546                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
547                            "map db: %s %s" % (lineno, line, e))
548        except IOError, e:
549            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
550                    "open %s: %s" % (file, e))
551        f.close()
552
553    class emulab_segment:
554        def __init__(self, log=None, keyfile=None, debug=False):
555            self.log = log or logging.getLogger(\
556                    'fedd.experiment_control.emulab_segment')
557            self.ssh_privkey_file = keyfile
558            self.debug = debug
559            self.ssh_exec="/usr/bin/ssh"
560            self.scp_exec = "/usr/bin/scp"
561            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
562
563        def scp_file(self, file, user, host, dest=""):
564            """
565            scp a file to the remote host.  If debug is set the action is only
566            logged.
567            """
568
569            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
570                    '-o', 'StrictHostKeyChecking yes', '-i', 
571                    self.ssh_privkey_file, file, 
572                    "%s@%s:%s" % (user, host, dest)]
573            rv = 0
574
575            try:
576                dnull = open("/dev/null", "w")
577            except IOError:
578                self.log.debug("[ssh_file]: failed to open " + \
579                        "/dev/null for redirect")
580                dnull = Null
581
582            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
583            if not self.debug:
584                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
585                        close_fds=True)
586
587            return rv == 0
588
589        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
590            """
591            Run a remote command on host as user.  If debug is set, the action
592            is only logged.  Commands are run without stdin, to avoid stray
593            SIGTTINs.
594            """
595            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
596                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
597                    (self.ssh_exec, self.ssh_privkey_file, 
598                            user, host, cmd)
599
600            try:
601                dnull = open("/dev/null", "w")
602            except IOError:
603                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
604                        "for redirect")
605                dnull = Null
606
607            self.log.debug("[ssh_cmd]: %s" % sh_str)
608            if not self.debug:
609                if dnull:
610                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
611                            close_fds=True)
612                else:
613                    sub = Popen(sh_str, shell=True,
614                            close_fds=True)
615                if timeout:
616                    i = 0
617                    rv = sub.poll()
618                    while i < timeout:
619                        if rv is not None: break
620                        else:
621                            time.sleep(1)
622                            rv = sub.poll()
623                            i += 1
624                    else:
625                        self.log.debug("Process exceeded runtime: %s" % sh_str)
626                        os.kill(sub.pid, signal.SIGKILL)
627                        raise self.ssh_cmd_timeout();
628                    return rv == 0
629                else:
630                    return sub.wait() == 0
631            else:
632                if timeout == 0:
633                    self.log.debug("debug timeout raised on %s " % sh_str)
634                    raise self.ssh_cmd_timeout()
635                else:
636                    return True
637
638    class start_segment(emulab_segment):
639        def __init__(self, log=None, keyfile=None, debug=False):
640            experiment_control_local.emulab_segment.__init__(self,
641                    log=log, keyfile=keyfile, debug=debug)
642
643        def create_config_tree(self, src_dir, dest_dir, script):
644            """
645            Append commands to script that will create the directory hierarchy
646            on the remote federant.
647            """
648
649            if os.path.isdir(src_dir):
650                print >>script, "mkdir -p %s" % dest_dir
651                print >>script, "chmod 770 %s" % dest_dir
652
653                for f in os.listdir(src_dir):
654                    if os.path.isdir(f):
655                        self.create_config_tree("%s/%s" % (src_dir, f), 
656                                "%s/%s" % (dest_dir, f), script)
657            else:
658                self.log.debug("[create_config_tree]: Not a directory: %s" \
659                        % src_dir)
660
661        def ship_configs(self, host, user, src_dir, dest_dir):
662            """
663            Copy federant-specific configuration files to the federant.
664            """
665            for f in os.listdir(src_dir):
666                if os.path.isdir(f):
667                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
668                            "%s/%s" % (dest_dir, f)):
669                        return False
670                else:
671                    if not self.scp_file("%s/%s" % (src_dir, f), 
672                            user, host, dest_dir):
673                        return False
674            return True
675
676        def get_state(self, user, host, tb, pid, eid):
677            # command to test experiment state
678            expinfo_exec = "/usr/testbed/bin/expinfo" 
679            # Regular expressions to parse the expinfo response
680            state_re = re.compile("State:\s+(\w+)")
681            no_exp_re = re.compile("^No\s+such\s+experiment")
682            swapping_re = re.compile("^No\s+information\s+available.")
683            state = None    # Experiment state parsed from expinfo
684            # The expinfo ssh command.  Note the identity restriction to use
685            # only the identity provided in the pubkey given.
686            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
687                    'StrictHostKeyChecking yes', '-i', 
688                    self.ssh_privkey_file, "%s@%s" % (user, host), 
689                    expinfo_exec, pid, eid]
690
691            dev_null = None
692            try:
693                dev_null = open("/dev/null", "a")
694            except IOError, e:
695                self.log.error("[get_state]: can't open /dev/null: %s" %e)
696
697            if self.debug:
698                state = 'swapped'
699                rv = 0
700            else:
701                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
702                        close_fds=True)
703                for line in status.stdout:
704                    m = state_re.match(line)
705                    if m: state = m.group(1)
706                    else:
707                        for reg, st in ((no_exp_re, "none"),
708                                (swapping_re, "swapping")):
709                            m = reg.match(line)
710                            if m: state = st
711                rv = status.wait()
712
713            # If the experiment is not present the subcommand returns a
714            # non-zero return value.  If we successfully parsed a "none"
715            # outcome, ignore the return code.
716            if rv != 0 and state != 'none':
717                raise service_error(service_error.internal,
718                        "Cannot get status of segment %s:%s/%s" % \
719                                (tb, pid, eid))
720            elif state not in ('active', 'swapped', 'swapping', 'none'):
721                raise service_error(service_error.internal,
722                        "Cannot get status of segment %s:%s/%s" % \
723                                (tb, pid, eid))
724            else: return state
725
726
727        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
728            """
729            Start a sub-experiment on a federant.
730
731            Get the current state, modify or create as appropriate, ship data
732            and configs and start the experiment.  There are small ordering
733            differences based on the initial state of the sub-experiment.
734            """
735            # ops node in the federant
736            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
737            user = tbparams[tb]['user']     # federant user
738            pid = tbparams[tb]['project']   # federant project
739            # XXX
740            base_confs = ( "hosts",)
741            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
742            # Configuration directories on the remote machine
743            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
744            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
745            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
746
747            state = self.get_state(user, host, tb, pid, eid)
748
749            self.log.debug("[start_segment]: %s: %s" % (tb, state))
750            self.log.info("[start_segment]:transferring experiment to %s" % tb)
751
752            if not self.scp_file("%s/%s/%s" % \
753                    (tmpdir, tb, tclfile), user, host):
754                return False
755           
756            if state == 'none':
757                # Create a null copy of the experiment so that we capture any
758                # logs there if the modify fails.  Emulab software discards the
759                # logs from a failed startexp
760                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
761                    return False
762                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
763                timedout = False
764                try:
765                    if not self.ssh_cmd(user, host,
766                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
767                            "-e %s null.tcl") % (pid, eid), "startexp",
768                            timeout=60 * 10):
769                        return False
770                except self.ssh_cmd_timeout:
771                    timedout = True
772
773                if timedout:
774                    state = self.get_state(user, host, tb, pid, eid)
775                    if state != "swapped":
776                        return False
777
778           
779            # Open up a temporary file to contain a script for setting up the
780            # filespace for the new experiment.
781            self.log.info("[start_segment]: creating script file")
782            try:
783                sf, scriptname = tempfile.mkstemp()
784                scriptfile = os.fdopen(sf, 'w')
785            except IOError:
786                return False
787
788            scriptbase = os.path.basename(scriptname)
789
790            # Script the filesystem changes
791            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
792            # Clear and create the tarfiles and rpm directories
793            for d in (tarfiles_dir, rpms_dir):
794                print >>scriptfile, "/bin/rm -rf %s/*" % d
795                print >>scriptfile, "mkdir -p %s" % d
796            print >>scriptfile, 'mkdir -p %s' % proj_dir
797            self.create_config_tree("%s/%s" % (tmpdir, tb),
798                    proj_dir, scriptfile)
799            if os.path.isdir("%s/tarfiles" % tmpdir):
800                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
801                        scriptfile)
802            if os.path.isdir("%s/rpms" % tmpdir):
803                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
804                        scriptfile)
805            print >>scriptfile, "rm -f %s" % scriptbase
806            scriptfile.close()
807
808            # Move the script to the remote machine
809            # XXX: could collide tempfile names on the remote host
810            if self.scp_file(scriptname, user, host, scriptbase):
811                os.remove(scriptname)
812            else:
813                return False
814
815            # Execute the script (and the script's last line deletes it)
816            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
817                return False
818
819            for f in base_confs:
820                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
821                        "%s/%s" % (proj_dir, f)):
822                    return False
823            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
824                    proj_dir):
825                return False
826            if os.path.isdir("%s/tarfiles" % tmpdir):
827                if not self.ship_configs(host, user,
828                        "%s/tarfiles" % tmpdir, tarfiles_dir):
829                    return False
830            if os.path.isdir("%s/rpms" % tmpdir):
831                if not self.ship_configs(host, user,
832                        "%s/rpms" % tmpdir, tarfiles_dir):
833                    return False
834            # Stage the new configuration (active experiments will stay swapped
835            # in now)
836            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
837            try:
838                if not self.ssh_cmd(user, host,
839                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
840                                (pid, eid, tclfile),
841                        "modexp", timeout= 60 * 10):
842                    return False
843            except self.ssh_cmd_timeout:
844                self.log.error("Modify command failed to complete in time")
845                # There's really no way to see if this succeeded or failed, so
846                # if it hangs, assume the worst.
847                return False
848            # Active experiments are still swapped, this swaps the others in.
849            if state != 'active':
850                self.log.info("[start_segment]: Swapping %s in on %s" % \
851                        (eid, tb))
852                timedout = False
853                try:
854                    if not self.ssh_cmd(user, host,
855                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
856                            "swapexp", timeout=10*60):
857                        return False
858                except self.ssh_cmd_timeout:
859                    timedout = True
860               
861                # If the command was terminated, but completed successfully,
862                # report success.
863                if timedout:
864                    self.log.debug("[start_segment]: swapin timed out " +\
865                            "checking state")
866                    state = self.get_state(user, host, tb, pid, eid)
867                    self.log.debug("[start_segment]: state is %s" % state)
868                    return state == 'active'
869            # Everything has gone OK.
870            return True
871
872    class stop_segment(emulab_segment):
873        def __init__(self, log=None, keyfile=None, debug=False):
874            experiment_control_local.emulab_segment.__init__(self,
875                    log=log, keyfile=keyfile, debug=debug)
876
877        def __call__(self, tb, eid, tbparams):
878            """
879            Stop a sub experiment by calling swapexp on the federant
880            """
881            user = tbparams[tb]['user']
882            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
883            pid = tbparams[tb]['project']
884
885            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
886            rv = False
887            try:
888                # Clean out tar files: we've gone over quota in the past
889                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
890                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
891                        (pid, eid))
892                rv = self.ssh_cmd(user, host,
893                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
894            except self.ssh_cmd_timeout:
895                rv = False
896            return rv
897
898       
899    def generate_ssh_keys(self, dest, type="rsa" ):
900        """
901        Generate a set of keys for the gateways to use to talk.
902
903        Keys are of type type and are stored in the required dest file.
904        """
905        valid_types = ("rsa", "dsa")
906        t = type.lower();
907        if t not in valid_types: raise ValueError
908        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
909
910        try:
911            trace = open("/dev/null", "w")
912        except IOError:
913            raise service_error(service_error.internal,
914                    "Cannot open /dev/null??");
915
916        # May raise CalledProcessError
917        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
918        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
919        if rv != 0:
920            raise service_error(service_error.internal, 
921                    "Cannot generate nonce ssh keys.  %s return code %d" \
922                            % (self.ssh_keygen, rv))
923
924    def gentopo(self, str):
925        """
926        Generate the topology dtat structure from the splitter's XML
927        representation of it.
928
929        The topology XML looks like:
930            <experiment>
931                <nodes>
932                    <node><vname></vname><ips>ip1:ip2</ips></node>
933                </nodes>
934                <lans>
935                    <lan>
936                        <vname></vname><vnode></vnode><ip></ip>
937                        <bandwidth></bandwidth><member>node:port</member>
938                    </lan>
939                </lans>
940        """
941        class topo_parse:
942            """
943            Parse the topology XML and create the dats structure.
944            """
945            def __init__(self):
946                # Typing of the subelements for data conversion
947                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
948                self.int_subelements = ( 'bandwidth',)
949                self.float_subelements = ( 'delay',)
950                # The final data structure
951                self.nodes = [ ]
952                self.lans =  [ ]
953                self.topo = { \
954                        'node': self.nodes,\
955                        'lan' : self.lans,\
956                    }
957                self.element = { }  # Current element being created
958                self.chars = ""     # Last text seen
959
960            def end_element(self, name):
961                # After each sub element the contents is added to the current
962                # element or to the appropriate list.
963                if name == 'node':
964                    self.nodes.append(self.element)
965                    self.element = { }
966                elif name == 'lan':
967                    self.lans.append(self.element)
968                    self.element = { }
969                elif name in self.str_subelements:
970                    self.element[name] = self.chars
971                    self.chars = ""
972                elif name in self.int_subelements:
973                    self.element[name] = int(self.chars)
974                    self.chars = ""
975                elif name in self.float_subelements:
976                    self.element[name] = float(self.chars)
977                    self.chars = ""
978
979            def found_chars(self, data):
980                self.chars += data.rstrip()
981
982
983        tp = topo_parse();
984        parser = xml.parsers.expat.ParserCreate()
985        parser.EndElementHandler = tp.end_element
986        parser.CharacterDataHandler = tp.found_chars
987
988        parser.Parse(str)
989
990        return tp.topo
991       
992
993    def genviz(self, topo):
994        """
995        Generate the visualization the virtual topology
996        """
997
998        neato = "/usr/local/bin/neato"
999        # These are used to parse neato output and to create the visualization
1000        # file.
1001        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
1002        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
1003                "%s</type></node>"
1004
1005        try:
1006            # Node names
1007            nodes = [ n['vname'] for n in topo['node'] ]
1008            topo_lans = topo['lan']
1009        except KeyError, e:
1010            raise service_error(service_error.internal, "Bad topology: %s" %e)
1011
1012        lans = { }
1013        links = { }
1014
1015        # Walk through the virtual topology, organizing the connections into
1016        # 2-node connections (links) and more-than-2-node connections (lans).
1017        # When a lan is created, it's added to the list of nodes (there's a
1018        # node in the visualization for the lan).
1019        for l in topo_lans:
1020            if links.has_key(l['vname']):
1021                if len(links[l['vname']]) < 2:
1022                    links[l['vname']].append(l['vnode'])
1023                else:
1024                    nodes.append(l['vname'])
1025                    lans[l['vname']] = links[l['vname']]
1026                    del links[l['vname']]
1027                    lans[l['vname']].append(l['vnode'])
1028            elif lans.has_key(l['vname']):
1029                lans[l['vname']].append(l['vnode'])
1030            else:
1031                links[l['vname']] = [ l['vnode'] ]
1032
1033
1034        # Open up a temporary file for dot to turn into a visualization
1035        try:
1036            df, dotname = tempfile.mkstemp()
1037            dotfile = os.fdopen(df, 'w')
1038        except IOError:
1039            raise service_error(service_error.internal,
1040                    "Failed to open file in genviz")
1041
1042        try:
1043            dnull = open('/dev/null', 'w')
1044        except IOError:
1045            service_error(service_error.internal,
1046                    "Failed to open /dev/null in genviz")
1047
1048        # Generate a dot/neato input file from the links, nodes and lans
1049        try:
1050            print >>dotfile, "graph G {"
1051            for n in nodes:
1052                print >>dotfile, '\t"%s"' % n
1053            for l in links.keys():
1054                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
1055            for l in lans.keys():
1056                for n in lans[l]:
1057                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
1058            print >>dotfile, "}"
1059            dotfile.close()
1060        except TypeError:
1061            raise service_error(service_error.internal,
1062                    "Single endpoint link in vtopo")
1063        except IOError:
1064            raise service_error(service_error.internal, "Cannot write dot file")
1065
1066        # Use dot to create a visualization
1067        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1068                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
1069                close_fds=True)
1070        dnull.close()
1071
1072        # Translate dot to vis format
1073        vis_nodes = [ ]
1074        vis = { 'node': vis_nodes }
1075        for line in dot.stdout:
1076            m = vis_re.match(line)
1077            if m:
1078                vn = m.group(1)
1079                vis_node = {'name': vn, \
1080                        'x': float(m.group(2)),\
1081                        'y' : float(m.group(3)),\
1082                    }
1083                if vn in links.keys() or vn in lans.keys():
1084                    vis_node['type'] = 'lan'
1085                else:
1086                    vis_node['type'] = 'node'
1087                vis_nodes.append(vis_node)
1088        rv = dot.wait()
1089
1090        os.remove(dotname)
1091        if rv == 0 : return vis
1092        else: return None
1093
1094    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1095            access_user):
1096        """
1097        Get access to testbed through fedd and set the parameters for that tb
1098        """
1099        uri = self.tbmap.get(tb, None)
1100        if not uri:
1101            raise service_error(serice_error.server_config, 
1102                    "Unknown testbed: %s" % tb)
1103
1104        # currently this lumps all users into one service access group
1105        service_keys = [ a for u in user \
1106                for a in u.get('access', []) \
1107                    if a.has_key('sshPubkey')]
1108
1109        if len(service_keys) == 0:
1110            raise service_error(service_error.req, 
1111                    "Must have at least one SSH pubkey for services")
1112
1113
1114        for p, u in access_user:
1115            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1116                    "to %s") %  ((p or "None"), u, uri))
1117
1118            if p:
1119                # Request with user and project specified
1120                req = {\
1121                        'destinationTestbed' : { 'uri' : uri },
1122                        'project': { 
1123                            'name': {'localname': p},
1124                            'user': [ {'userID': { 'localname': u } } ],
1125                            },
1126                        'user':  user,
1127                        'allocID' : { 'localname': 'test' },
1128                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1129                        'serviceAccess' : service_keys
1130                    }
1131            else:
1132                # Request with only user specified
1133                req = {\
1134                        'destinationTestbed' : { 'uri' : uri },
1135                        'user':  [ {'userID': { 'localname': u } } ],
1136                        'allocID' : { 'localname': 'test' },
1137                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1138                        'serviceAccess' : service_keys
1139                    }
1140
1141            if tb == master:
1142                # NB, the export_project parameter is a dict that includes
1143                # the type
1144                req['exportProject'] = export_project
1145
1146            # node resources if any
1147            if nodes != None and len(nodes) > 0:
1148                rnodes = [ ]
1149                for n in nodes:
1150                    rn = { }
1151                    image, hw, count = n.split(":")
1152                    if image: rn['image'] = [ image ]
1153                    if hw: rn['hardware'] = [ hw ]
1154                    if count and int(count) >0 : rn['count'] = int(count)
1155                    rnodes.append(rn)
1156                req['resources']= { }
1157                req['resources']['node'] = rnodes
1158
1159            try:
1160                if self.local_access.has_key(uri):
1161                    # Local access call
1162                    req = { 'RequestAccessRequestBody' : req }
1163                    r = self.local_access[uri].RequestAccess(req, 
1164                            fedid(file=self.cert_file))
1165                    r = { 'RequestAccessResponseBody' : r }
1166                else:
1167                    r = self.call_RequestAccess(uri, req, 
1168                            self.cert_file, self.cert_pwd, self.trusted_certs)
1169            except service_error, e:
1170                if e.code == service_error.access:
1171                    self.log.debug("[get_access] Access denied")
1172                    r = None
1173                    continue
1174                else:
1175                    raise e
1176
1177            if r.has_key('RequestAccessResponseBody'):
1178                # Through to here we have a valid response, not a fault.
1179                # Access denied is a fault, so something better or worse than
1180                # access denied has happened.
1181                r = r['RequestAccessResponseBody']
1182                self.log.debug("[get_access] Access granted")
1183                break
1184            else:
1185                raise service_error(service_error.protocol,
1186                        "Bad proxy response")
1187       
1188        if not r:
1189            raise service_error(service_error.access, 
1190                    "Access denied by %s (%s)" % (tb, uri))
1191
1192        e = r['emulab']
1193        p = e['project']
1194        tbparam[tb] = { 
1195                "boss": e['boss'],
1196                "host": e['ops'],
1197                "domain": e['domain'],
1198                "fs": e['fileServer'],
1199                "eventserver": e['eventServer'],
1200                "project": unpack_id(p['name']),
1201                "emulab" : e,
1202                "allocID" : r['allocID'],
1203                }
1204        # Make the testbed name be the label the user applied
1205        p['testbed'] = {'localname': tb }
1206
1207        for u in p['user']:
1208            role = u.get('role', None)
1209            if role == 'experimentCreation':
1210                tbparam[tb]['user'] = unpack_id(u['userID'])
1211                break
1212        else:
1213            raise service_error(service_error.internal, 
1214                    "No createExperimentUser from %s" %tb)
1215
1216        # Add attributes to barameter space.  We don't allow attributes to
1217        # overlay any parameters already installed.
1218        for a in e['fedAttr']:
1219            try:
1220                if a['attribute'] and isinstance(a['attribute'], basestring)\
1221                        and not tbparam[tb].has_key(a['attribute'].lower()):
1222                    tbparam[tb][a['attribute'].lower()] = a['value']
1223            except KeyError:
1224                self.log.error("Bad attribute in response: %s" % a)
1225       
1226    def release_access(self, tb, aid):
1227        """
1228        Release access to testbed through fedd
1229        """
1230
1231        uri = self.tbmap.get(tb, None)
1232        if not uri:
1233            raise service_error(serice_error.server_config, 
1234                    "Unknown testbed: %s" % tb)
1235
1236        if self.local_access.has_key(uri):
1237            resp = self.local_access[uri].ReleaseAccess(\
1238                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1239                    fedid(file=self.cert_file))
1240            resp = { 'ReleaseAccessResponseBody': resp } 
1241        else:
1242            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1243                    self.cert_file, self.cert_pwd, self.trusted_certs)
1244
1245        # better error coding
1246
1247    def remote_splitter(self, uri, desc, master):
1248
1249        req = {
1250                'description' : { 'ns2description': desc },
1251                'master': master,
1252                'include_fedkit': bool(self.fedkit),
1253                'include_gatewaykit': bool(self.gatewaykit)
1254            }
1255
1256        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1257                self.trusted_certs)
1258
1259        if r.has_key('Ns2SplitResponseBody'):
1260            r = r['Ns2SplitResponseBody']
1261            if r.has_key('output'):
1262                return r['output'].splitlines()
1263            else:
1264                raise service_error(service_error.protocol, 
1265                        "Bad splitter response (no output)")
1266        else:
1267            raise service_error(service_error.protocol, "Bad splitter response")
1268       
1269    class current_testbed:
1270        """
1271        Object for collecting the current testbed description.  The testbed
1272        description is saved to a file with the local testbed variables
1273        subsittuted line by line.
1274        """
1275        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1276            def tar_list_to_string(tl):
1277                if tl is None: return None
1278
1279                rv = ""
1280                for t in tl:
1281                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1282                            (t[0], os.path.basename(t[1]))
1283                return rv
1284
1285
1286            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1287            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1288            self.current_testbed = None
1289            self.testbed_file = None
1290
1291            self.def_expstart = \
1292                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1293            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1294            self.def_gwstart = \
1295                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1296            self.def_mgwstart = \
1297                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1298            self.def_gwimage = "FBSD61-TUNNEL2";
1299            self.def_gwtype = "pc";
1300            self.def_mgwcmd = '# '
1301            self.def_mgwcmdparams = ''
1302            self.def_gwcmd = '# '
1303            self.def_gwcmdparams = ''
1304
1305            self.eid = eid
1306            self.tmpdir = tmpdir
1307            # Convert fedkit and gateway kit (which are lists of tuples) into a
1308            # substituition string.
1309            self.fedkit = tar_list_to_string(fedkit)
1310            self.gatewaykit = tar_list_to_string(gatewaykit)
1311
1312        def __call__(self, line, master, allocated, tbparams):
1313            # Capture testbed topology descriptions
1314            if self.current_testbed == None:
1315                m = self.begin_testbed.match(line)
1316                if m != None:
1317                    self.current_testbed = m.group(1)
1318                    if self.current_testbed == None:
1319                        raise service_error(service_error.req,
1320                                "Bad request format (unnamed testbed)")
1321                    allocated[self.current_testbed] = \
1322                            allocated.get(self.current_testbed,0) + 1
1323                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1324                    if not os.path.exists(tb_dir):
1325                        try:
1326                            os.mkdir(tb_dir)
1327                        except IOError:
1328                            raise service_error(service_error.internal,
1329                                    "Cannot create %s" % tb_dir)
1330                    try:
1331                        self.testbed_file = open("%s/%s.%s.tcl" %
1332                                (tb_dir, self.eid, self.current_testbed), 'w')
1333                    except IOError:
1334                        self.testbed_file = None
1335                    return True
1336                else: return False
1337            else:
1338                m = self.end_testbed.match(line)
1339                if m != None:
1340                    if m.group(1) != self.current_testbed:
1341                        raise service_error(service_error.internal, 
1342                                "Mismatched testbed markers!?")
1343                    if self.testbed_file != None: 
1344                        self.testbed_file.close()
1345                        self.testbed_file = None
1346                    self.current_testbed = None
1347                elif self.testbed_file:
1348                    # Substitute variables and put the line into the local
1349                    # testbed file.
1350                    gwtype = tbparams[self.current_testbed].get(\
1351                            'connectortype', self.def_gwtype)
1352                    gwimage = tbparams[self.current_testbed].get(\
1353                            'connectorimage', self.def_gwimage)
1354                    mgwstart = tbparams[self.current_testbed].get(\
1355                            'masterconnectorstartcmd', self.def_mgwstart)
1356                    mexpstart = tbparams[self.current_testbed].get(\
1357                            'masternodestartcmd', self.def_mexpstart)
1358                    gwstart = tbparams[self.current_testbed].get(\
1359                            'slaveconnectorstartcmd', self.def_gwstart)
1360                    expstart = tbparams[self.current_testbed].get(\
1361                            'slavenodestartcmd', self.def_expstart)
1362                    project = tbparams[self.current_testbed].get('project')
1363                    gwcmd = tbparams[self.current_testbed].get(\
1364                            'slaveconnectorcmd', self.def_gwcmd)
1365                    gwcmdparams = tbparams[self.current_testbed].get(\
1366                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1367                    mgwcmd = tbparams[self.current_testbed].get(\
1368                            'masterconnectorcmd', self.def_gwcmd)
1369                    mgwcmdparams = tbparams[self.current_testbed].get(\
1370                            'masterconnectorcmdparams', self.def_gwcmdparams)
1371                    line = re.sub("GWTYPE", gwtype, line)
1372                    line = re.sub("GWIMAGE", gwimage, line)
1373                    if self.current_testbed == master:
1374                        line = re.sub("GWSTART", mgwstart, line)
1375                        line = re.sub("EXPSTART", mexpstart, line)
1376                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1377                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1378                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1379                    else:
1380                        line = re.sub("GWSTART", gwstart, line)
1381                        line = re.sub("EXPSTART", expstart, line)
1382                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1383                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1384                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1385                    #These expansions contain EID and PROJDIR.  NB these are
1386                    # local fedkit and gatewaykit, which are strings.
1387                    if self.fedkit:
1388                        line = re.sub("FEDKIT", self.fedkit, line)
1389                    if self.gatewaykit:
1390                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1391                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1392                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1393                    line = re.sub("EID", self.eid, line)
1394                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1395                            (project, self.eid), line)
1396                    print >>self.testbed_file, line
1397                return True
1398
1399    class allbeds:
1400        """
1401        Process the Allbeds section.  Get access to each federant and save the
1402        parameters in tbparams
1403        """
1404        def __init__(self, get_access):
1405            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1406            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1407            self.in_allbeds = False
1408            self.get_access = get_access
1409
1410        def __call__(self, line, user, tbparams, master, export_project,
1411                access_user):
1412            # Testbed access parameters
1413            if not self.in_allbeds:
1414                if self.begin_allbeds.match(line):
1415                    self.in_allbeds = True
1416                    return True
1417                else:
1418                    return False
1419            else:
1420                if self.end_allbeds.match(line):
1421                    self.in_allbeds = False
1422                else:
1423                    nodes = line.split('|')
1424                    tb = nodes.pop(0)
1425                    self.get_access(tb, nodes, user, tbparams, master,
1426                            export_project, access_user)
1427                return True
1428
1429    class gateways:
1430        def __init__(self, eid, master, tmpdir, gw_pubkey,
1431                gw_secretkey, copy_file, fedkit):
1432            self.begin_gateways = \
1433                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1434            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1435            self.current_gateways = None
1436            self.control_gateway = None
1437            self.active_end = { }
1438
1439            self.eid = eid
1440            self.master = master
1441            self.tmpdir = tmpdir
1442            self.gw_pubkey_base = gw_pubkey
1443            self.gw_secretkey_base = gw_secretkey
1444
1445            self.copy_file = copy_file
1446            self.fedkit = fedkit
1447
1448
1449        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1450                active_end, tbparams, dtb, myname, desthost, type):
1451            """
1452            Produce a gateway configuration file from a gateways line.
1453            """
1454
1455            sproject = tbparams[gw].get('project', 'project')
1456            dproject = tbparams[dtb].get('project', 'project')
1457            sdomain = ".%s.%s%s" % (eid, sproject,
1458                    tbparams[gw].get('domain', ".example.com"))
1459            ddomain = ".%s.%s%s" % (eid, dproject,
1460                    tbparams[dtb].get('domain', ".example.com"))
1461            boss = tbparams[master].get('boss', "boss")
1462            fs = tbparams[master].get('fs', "fs")
1463            event_server = "%s%s" % \
1464                    (tbparams[gw].get('eventserver', "event_server"),
1465                            tbparams[gw].get('domain', "example.com"))
1466            remote_event_server = "%s%s" % \
1467                    (tbparams[dtb].get('eventserver', "event_server"),
1468                            tbparams[dtb].get('domain', "example.com"))
1469            seer_control = "%s%s" % \
1470                    (tbparams[gw].get('control', "control"), sdomain)
1471            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1472
1473            if self.fedkit:
1474                remote_script_dir = "/usr/local/federation/bin"
1475                local_script_dir = "/usr/local/federation/bin"
1476            else:
1477                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1478                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1479
1480            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1481            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1482            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1483
1484            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1485            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1486
1487            # translate to lower case so the `hostname` hack for specifying
1488            # configuration files works.
1489            conf_file = conf_file.lower();
1490            remote_conf_file = remote_conf_file.lower();
1491
1492            if dtb == master:
1493                active = "false"
1494            elif gw == master:
1495                active = "true"
1496            elif active_end.has_key('%s-%s' % (dtb, gw)):
1497                active = "false"
1498            else:
1499                active_end['%s-%s' % (gw, dtb)] = 1
1500                active = "true"
1501
1502            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1503            print >>gwconfig, "Active: %s" % active
1504            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1505            if tunnel_iface:
1506                print >>gwconfig, "Interface: %s" % tunnel_iface
1507            print >>gwconfig, "BossName: %s" % boss
1508            print >>gwconfig, "FsName: %s" % fs
1509            print >>gwconfig, "EventServerName: %s" % event_server
1510            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1511            print >>gwconfig, "SeerControl: %s" % seer_control
1512            print >>gwconfig, "Type: %s" % type
1513            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1514            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1515                    local_script_dir
1516            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1517            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1518            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1519                    (remote_conf_dir, remote_conf_file)
1520            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1521            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1522            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1523            gwconfig.close()
1524
1525            return active == "true"
1526
1527        def __call__(self, line, allocated, tbparams):
1528            # Process gateways
1529            if not self.current_gateways:
1530                m = self.begin_gateways.match(line)
1531                if m:
1532                    self.current_gateways = m.group(1)
1533                    if allocated.has_key(self.current_gateways):
1534                        # This test should always succeed
1535                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1536                        if not os.path.exists(tb_dir):
1537                            try:
1538                                os.mkdir(tb_dir)
1539                            except IOError:
1540                                raise service_error(service_error.internal,
1541                                        "Cannot create %s" % tb_dir)
1542                    else:
1543                        # XXX
1544                        self.log.error("[gateways]: Ignoring gateways for " + \
1545                                "unknown testbed %s" % self.current_gateways)
1546                        self.current_gateways = None
1547                    return True
1548                else:
1549                    return False
1550            else:
1551                m = self.end_gateways.match(line)
1552                if m :
1553                    if m.group(1) != self.current_gateways:
1554                        raise service_error(service_error.internal,
1555                                "Mismatched gateway markers!?")
1556                    if self.control_gateway:
1557                        try:
1558                            cc = open("%s/%s/client.conf" %
1559                                    (self.tmpdir, self.current_gateways), 'w')
1560                            print >>cc, "ControlGateway: %s" % \
1561                                    self.control_gateway
1562                            if tbparams[self.master].has_key('smbshare'):
1563                                print >>cc, "SMBSHare: %s" % \
1564                                        tbparams[self.master]['smbshare']
1565                            print >>cc, "ProjectUser: %s" % \
1566                                    tbparams[self.master]['user']
1567                            print >>cc, "ProjectName: %s" % \
1568                                    tbparams[self.master]['project']
1569                            print >>cc, "ExperimentID: %s/%s" % \
1570                                    ( tbparams[self.master]['project'], \
1571                                    self.eid )
1572                            cc.close()
1573                        except IOError:
1574                            raise service_error(service_error.internal,
1575                                    "Error creating client config")
1576                        # XXX: This seer specific file should disappear
1577                        try:
1578                            cc = open("%s/%s/seer.conf" %
1579                                    (self.tmpdir, self.current_gateways),
1580                                    'w')
1581                            if self.current_gateways != self.master:
1582                                print >>cc, "ControlNode: %s" % \
1583                                        self.control_gateway
1584                            print >>cc, "ExperimentID: %s/%s" % \
1585                                    ( tbparams[self.master]['project'], \
1586                                    self.eid )
1587                            cc.close()
1588                        except IOError:
1589                            raise service_error(service_error.internal,
1590                                    "Error creating seer config")
1591                    else:
1592                        debug.error("[gateways]: No control gateway for %s" %\
1593                                    self.current_gateways)
1594                    self.current_gateways = None
1595                else:
1596                    dtb, myname, desthost, type = line.split(" ")
1597
1598                    if type == "control" or type == "both":
1599                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1600                                self.eid, 
1601                                tbparams[self.current_gateways]['project'],
1602                                tbparams[self.current_gateways]['domain'])
1603                    try:
1604                        active = self.gateway_conf_file(self.current_gateways,
1605                                self.master, self.eid, self.gw_pubkey_base,
1606                                self.gw_secretkey_base,
1607                                self.active_end, tbparams, dtb, myname,
1608                                desthost, type)
1609                    except IOError, e:
1610                        raise service_error(service_error.internal,
1611                                "Failed to write config file for %s" % \
1612                                        self.current_gateway)
1613           
1614                    gw_pubkey = "%s/keys/%s" % \
1615                            (self.tmpdir, self.gw_pubkey_base)
1616                    gw_secretkey = "%s/keys/%s" % \
1617                            (self.tmpdir, self.gw_secretkey_base)
1618
1619                    pkfile = "%s/%s/%s" % \
1620                            ( self.tmpdir, self.current_gateways, 
1621                                    self.gw_pubkey_base)
1622                    skfile = "%s/%s/%s" % \
1623                            ( self.tmpdir, self.current_gateways, 
1624                                    self.gw_secretkey_base)
1625
1626                    if not os.path.exists(pkfile):
1627                        try:
1628                            self.copy_file(gw_pubkey, pkfile)
1629                        except IOError:
1630                            service_error(service_error.internal,
1631                                    "Failed to copy pubkey file")
1632
1633                    if active and not os.path.exists(skfile):
1634                        try:
1635                            self.copy_file(gw_secretkey, skfile)
1636                        except IOError:
1637                            service_error(service_error.internal,
1638                                    "Failed to copy secretkey file")
1639                return True
1640
1641    class shunt_to_file:
1642        """
1643        Simple class to write data between two regexps to a file.
1644        """
1645        def __init__(self, begin, end, filename):
1646            """
1647            Begin shunting on a match of begin, stop on end, send data to
1648            filename.
1649            """
1650            self.begin = re.compile(begin)
1651            self.end = re.compile(end)
1652            self.in_shunt = False
1653            self.file = None
1654            self.filename = filename
1655
1656        def __call__(self, line):
1657            """
1658            Call this on each line in the input that may be shunted.
1659            """
1660            if not self.in_shunt:
1661                if self.begin.match(line):
1662                    self.in_shunt = True
1663                    try:
1664                        self.file = open(self.filename, "w")
1665                    except:
1666                        self.file = None
1667                        raise
1668                    return True
1669                else:
1670                    return False
1671            else:
1672                if self.end.match(line):
1673                    if self.file: 
1674                        self.file.close()
1675                        self.file = None
1676                    self.in_shunt = False
1677                else:
1678                    if self.file:
1679                        print >>self.file, line
1680                return True
1681
1682    class shunt_to_list:
1683        """
1684        Same interface as shunt_to_file.  Data collected in self.list, one list
1685        element per line.
1686        """
1687        def __init__(self, begin, end):
1688            self.begin = re.compile(begin)
1689            self.end = re.compile(end)
1690            self.in_shunt = False
1691            self.list = [ ]
1692       
1693        def __call__(self, line):
1694            if not self.in_shunt:
1695                if self.begin.match(line):
1696                    self.in_shunt = True
1697                    return True
1698                else:
1699                    return False
1700            else:
1701                if self.end.match(line):
1702                    self.in_shunt = False
1703                else:
1704                    self.list.append(line)
1705                return True
1706
1707    class shunt_to_string:
1708        """
1709        Same interface as shunt_to_file.  Data collected in self.str, all in
1710        one string.
1711        """
1712        def __init__(self, begin, end):
1713            self.begin = re.compile(begin)
1714            self.end = re.compile(end)
1715            self.in_shunt = False
1716            self.str = ""
1717       
1718        def __call__(self, line):
1719            if not self.in_shunt:
1720                if self.begin.match(line):
1721                    self.in_shunt = True
1722                    return True
1723                else:
1724                    return False
1725            else:
1726                if self.end.match(line):
1727                    self.in_shunt = False
1728                else:
1729                    self.str += line
1730                return True
1731
1732    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1733            tbparams, tmpdir, alloc_log=None):
1734        started = { }           # Testbeds where a sub-experiment started
1735                                # successfully
1736
1737        # XXX
1738        fail_soft = False
1739
1740        log = alloc_log or self.log
1741
1742        thread_pool = self.thread_pool(self.nthreads)
1743        threads = [ ]
1744
1745        for tb in [ k for k in allocated.keys() if k != master]:
1746            # Create and start a thread to start the segment, and save it to
1747            # get the return value later
1748            thread_pool.wait_for_slot()
1749            t  = self.pooled_thread(\
1750                    target=self.start_segment(log=log,
1751                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1752                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1753                    pdata=thread_pool, trace_file=self.trace_file)
1754            threads.append(t)
1755            t.start()
1756
1757        # Wait until all finish
1758        thread_pool.wait_for_all_done()
1759
1760        # If none failed, start the master
1761        failed = [ t.getName() for t in threads if not t.rv ]
1762
1763        if len(failed) == 0:
1764            starter = self.start_segment(log=log, 
1765                    keyfile=self.ssh_privkey_file, debug=self.debug)
1766            if not starter(master, eid, tbparams, tmpdir):
1767                failed.append(master)
1768
1769        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1770        # If one failed clean up, unless fail_soft is set
1771        if failed:
1772            if not fail_soft:
1773                thread_pool.clear()
1774                for tb in succeeded:
1775                    # Create and start a thread to stop the segment
1776                    thread_pool.wait_for_slot()
1777                    t  = self.pooled_thread(\
1778                            target=self.stop_segment(log=log,
1779                                keyfile=self.ssh_privkey_file,
1780                                debug=self.debug), 
1781                            args=(tb, eid, tbparams), name=tb,
1782                            pdata=thread_pool, trace_file=self.trace_file)
1783                    t.start()
1784                # Wait until all finish
1785                thread_pool.wait_for_all_done()
1786
1787                # release the allocations
1788                for tb in tbparams.keys():
1789                    self.release_access(tb, tbparams[tb]['allocID'])
1790                # Remove the placeholder
1791                self.state_lock.acquire()
1792                self.state[eid]['experimentStatus'] = 'failed'
1793                if self.state_filename: self.write_state()
1794                self.state_lock.release()
1795
1796                #raise service_error(service_error.federant,
1797                #    "Swap in failed on %s" % ",".join(failed))
1798                log.error("Swap in failed on %s" % ",".join(failed))
1799                return
1800        else:
1801            log.info("[start_segment]: Experiment %s active" % eid)
1802
1803        log.debug("[start_experiment]: removing %s" % tmpdir)
1804
1805        # Walk up tmpdir, deleting as we go
1806        for path, dirs, files in os.walk(tmpdir, topdown=False):
1807            for f in files:
1808                os.remove(os.path.join(path, f))
1809            for d in dirs:
1810                os.rmdir(os.path.join(path, d))
1811        os.rmdir(tmpdir)
1812
1813        # Insert the experiment into our state and update the disk copy
1814        self.state_lock.acquire()
1815        self.state[expid]['experimentStatus'] = 'active'
1816        self.state[eid] = self.state[expid]
1817        if self.state_filename: self.write_state()
1818        self.state_lock.release()
1819        return
1820
1821    def create_experiment(self, req, fid):
1822        """
1823        The external interface to experiment creation called from the
1824        dispatcher.
1825
1826        Creates a working directory, splits the incoming description using the
1827        splitter script and parses out the avrious subsections using the
1828        lcasses above.  Once each sub-experiment is created, use pooled threads
1829        to instantiate them and start it all up.
1830        """
1831
1832        if not self.auth.check_attribute(fid, 'create'):
1833            raise service_error(service_error.access, "Create access denied")
1834
1835        try:
1836            tmpdir = tempfile.mkdtemp(prefix="split-")
1837        except IOError:
1838            raise service_error(service_error.internal, "Cannot create tmp dir")
1839
1840        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1841        gw_secretkey_base = "fed.%s" % self.ssh_type
1842        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1843        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1844        tclfile = tmpdir + "/experiment.tcl"
1845        tbparams = { }
1846        try:
1847            access_user = self.accessdb[fid]
1848        except KeyError:
1849            raise service_error(service_error.internal,
1850                    "Access map and authorizer out of sync in " + \
1851                            "create_experiment for fedid %s"  % fid)
1852
1853        pid = "dummy"
1854        gid = "dummy"
1855        try:
1856            os.mkdir(tmpdir+"/keys")
1857        except OSError:
1858            raise service_error(service_error.internal,
1859                    "Can't make temporary dir")
1860
1861        req = req.get('CreateRequestBody', None)
1862        if not req:
1863            raise service_error(service_error.req,
1864                    "Bad request format (no CreateRequestBody)")
1865        # The tcl parser needs to read a file so put the content into that file
1866        descr=req.get('experimentdescription', None)
1867        if descr:
1868            file_content=descr.get('ns2description', None)
1869            if file_content:
1870                try:
1871                    f = open(tclfile, 'w')
1872                    f.write(file_content)
1873                    f.close()
1874                except IOError:
1875                    raise service_error(service_error.internal,
1876                            "Cannot write temp experiment description")
1877            else:
1878                raise service_error(service_error.req, 
1879                        "Only ns2descriptions supported")
1880        else:
1881            raise service_error(service_error.req, "No experiment description")
1882
1883        # Generate an ID for the experiment (slice) and a certificate that the
1884        # allocator can use to prove they own it.  We'll ship it back through
1885        # the encrypted connection.
1886        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1887
1888        if req.has_key('experimentID') and \
1889                req['experimentID'].has_key('localname'):
1890            overwrite = False
1891            eid = req['experimentID']['localname']
1892            # If there's an old failed experiment here with the same local name
1893            # and accessible by this user, we'll overwrite it, otherwise we'll
1894            # fall through and do the collision avoidance.
1895            old_expid = self.get_experiment_fedid(eid)
1896            if old_expid and self.check_experiment_access(fid, old_expid):
1897                self.state_lock.acquire()
1898                status = self.state[eid].get('experimentStatus', None)
1899                if status and status == 'failed':
1900                    # remove the old access attribute
1901                    self.auth.unset_attribute(fid, old_expid)
1902                    overwrite = True
1903                    del self.state[eid]
1904                    del self.state[old_expid]
1905                self.state_lock.release()
1906            self.state_lock.acquire()
1907            while (self.state.has_key(eid) and not overwrite):
1908                eid += random.choice(string.ascii_letters)
1909            # Initial state
1910            self.state[eid] = {
1911                    'experimentID' : \
1912                            [ { 'localname' : eid }, {'fedid': expid } ],
1913                    'experimentStatus': 'starting',
1914                    'experimentAccess': { 'X509' : expcert },
1915                    'owner': fid,
1916                    'log' : [],
1917                }
1918            self.state[expid] = self.state[eid]
1919            if self.state_filename: self.write_state()
1920            self.state_lock.release()
1921        else:
1922            eid = self.exp_stem
1923            for i in range(0,5):
1924                eid += random.choice(string.ascii_letters)
1925            self.state_lock.acquire()
1926            while (self.state.has_key(eid)):
1927                eid = self.exp_stem
1928                for i in range(0,5):
1929                    eid += random.choice(string.ascii_letters)
1930            # Initial state
1931            self.state[eid] = {
1932                    'experimentID' : \
1933                            [ { 'localname' : eid }, {'fedid': expid } ],
1934                    'experimentStatus': 'starting',
1935                    'experimentAccess': { 'X509' : expcert },
1936                    'owner': fid,
1937                    'log' : [],
1938                }
1939            self.state[expid] = self.state[eid]
1940            if self.state_filename: self.write_state()
1941            self.state_lock.release()
1942
1943        try: 
1944            # This catches exceptions to clear the placeholder if necessary
1945            try:
1946                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1947            except ValueError:
1948                raise service_error(service_error.server_config, 
1949                        "Bad key type (%s)" % self.ssh_type)
1950
1951            user = req.get('user', None)
1952            if user == None:
1953                raise service_error(service_error.req, "No user")
1954
1955            master = req.get('master', None)
1956            if not master:
1957                raise service_error(service_error.req,
1958                        "No master testbed label")
1959            export_project = req.get('exportProject', None)
1960            if not export_project:
1961                raise service_error(service_error.req, "No export project")
1962           
1963            if self.splitter_url:
1964                self.log.debug("Calling remote splitter at %s" % \
1965                        self.splitter_url)
1966                split_data = self.remote_splitter(self.splitter_url,
1967                        file_content, master)
1968            else:
1969                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1970                    str(self.muxmax), '-m', master]
1971
1972                if self.fedkit:
1973                    tclcmd.append('-k')
1974
1975                if self.gatewaykit:
1976                    tclcmd.append('-K')
1977
1978                tclcmd.extend([pid, gid, eid, tclfile])
1979
1980                self.log.debug("running local splitter %s", " ".join(tclcmd))
1981                # This is just fantastic.  As a side effect the parser copies
1982                # tb_compat.tcl into the current directory, so that directory
1983                # must be writable by the fedd user.  Doing this in the
1984                # temporary subdir ensures this is the case.
1985                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1986                        cwd=tmpdir)
1987                split_data = tclparser.stdout
1988
1989            allocated = { }         # Testbeds we can access
1990            # Objects to parse the splitter output (defined above)
1991            parse_current_testbed = self.current_testbed(eid, tmpdir,
1992                    self.fedkit, self.gatewaykit)
1993            parse_allbeds = self.allbeds(self.get_access)
1994            parse_gateways = self.gateways(eid, master, tmpdir,
1995                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1996                    self.fedkit)
1997            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1998                        "^#\s+End\s+Vtopo")
1999            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
2000                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
2001            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
2002                    "^#\s+End\s+tarfiles")
2003            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
2004                    "^#\s+End\s+rpms")
2005
2006            # Working on the split data
2007            for line in split_data:
2008                line = line.rstrip()
2009                if parse_current_testbed(line, master, allocated, tbparams):
2010                    continue
2011                elif parse_allbeds(line, user, tbparams, master, export_project,
2012                        access_user):
2013                    continue
2014                elif parse_gateways(line, allocated, tbparams):
2015                    continue
2016                elif parse_vtopo(line):
2017                    continue
2018                elif parse_hostnames(line):
2019                    continue
2020                elif parse_tarfiles(line):
2021                    continue
2022                elif parse_rpms(line):
2023                    continue
2024                else:
2025                    raise service_error(service_error.internal, 
2026                            "Bad tcl parse? %s" % line)
2027            # Virtual topology and visualization
2028            vtopo = self.gentopo(parse_vtopo.str)
2029            if not vtopo:
2030                raise service_error(service_error.internal, 
2031                        "Failed to generate virtual topology")
2032
2033            vis = self.genviz(vtopo)
2034            if not vis:
2035                raise service_error(service_error.internal, 
2036                        "Failed to generate visualization")
2037
2038           
2039            # save federant information
2040            for k in allocated.keys():
2041                tbparams[k]['federant'] = {\
2042                        'name': [ { 'localname' : eid} ],\
2043                        'emulab': tbparams[k]['emulab'],\
2044                        'allocID' : tbparams[k]['allocID'],\
2045                        'master' : k == master,\
2046                    }
2047
2048            self.state_lock.acquire()
2049            self.state[eid]['vtopo'] = vtopo
2050            self.state[eid]['vis'] = vis
2051            self.state[expid]['federant'] = \
2052                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2053                        if tbparams[tb].has_key('federant') ]
2054            if self.state_filename: self.write_state()
2055            self.state_lock.release()
2056
2057            # Copy tarfiles and rpms needed at remote sites into a staging area
2058            try:
2059                if self.fedkit:
2060                    for t in self.fedkit:
2061                        parse_tarfiles.list.append(t[1])
2062                if self.gatewaykit:
2063                    for t in self.gatewaykit:
2064                        parse_tarfiles.list.append(t[1])
2065                for t in parse_tarfiles.list:
2066                    if not os.path.exists("%s/tarfiles" % tmpdir):
2067                        os.mkdir("%s/tarfiles" % tmpdir)
2068                    self.copy_file(t, "%s/tarfiles/%s" % \
2069                            (tmpdir, os.path.basename(t)))
2070                for r in parse_rpms.list:
2071                    if not os.path.exists("%s/rpms" % tmpdir):
2072                        os.mkdir("%s/rpms" % tmpdir)
2073                    self.copy_file(r, "%s/rpms/%s" % \
2074                            (tmpdir, os.path.basename(r)))
2075                # A null experiment file in case we need to create a remote
2076                # experiment from scratch
2077                f = open("%s/null.tcl" % tmpdir, "w")
2078                print >>f, """
2079set ns [new Simulator]
2080source tb_compat.tcl
2081
2082set a [$ns node]
2083
2084$ns rtproto Session
2085$ns run
2086"""
2087                f.close()
2088
2089            except IOError, e:
2090                raise service_error(service_error.internal, 
2091                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2092
2093        except service_error, e:
2094            # If something goes wrong in the parse (usually an access error)
2095            # clear the placeholder state.  From here on out the code delays
2096            # exceptions.  Failing at this point returns a fault to the remote
2097            # caller.
2098            self.state_lock.acquire()
2099            del self.state[eid]
2100            del self.state[expid]
2101            if self.state_filename: self.write_state()
2102            self.state_lock.release()
2103            raise e
2104
2105
2106        # Start the background swapper and return the starting state.  From
2107        # here on out, the state will stick around a while.
2108
2109        # Let users touch the state
2110        self.auth.set_attribute(fid, expid)
2111        self.auth.set_attribute(expid, expid)
2112        # Override fedids can manipulate state as well
2113        for o in self.overrides:
2114            self.auth.set_attribute(o, expid)
2115
2116        # Create a logger that logs to the experiment's state object as well as
2117        # to the main log file.
2118        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2119        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2120        # XXX: there should be a global one of these rather than repeating the
2121        # code.
2122        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2123                    '%d %b %y %H:%M:%S'))
2124        alloc_log.addHandler(h)
2125       
2126        # Start a thread to do the resource allocation
2127        t  = Thread(target=self.allocate_resources,
2128                args=(allocated, master, eid, expid, expcert, tbparams, 
2129                    tmpdir, alloc_log),
2130                name=eid)
2131        t.start()
2132
2133        rv = {
2134                'experimentID': [
2135                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2136                ],
2137                'experimentStatus': 'starting',
2138                'experimentAccess': { 'X509' : expcert }
2139            }
2140
2141        return rv
2142
2143    class new_start_segment:
2144        def __init__(self, debug=False, log=None, cert_file=None,
2145                cert_pwd=None, trusted_certs=None, caller=None):
2146            self.log = log
2147            self.debug = debug
2148            self.cert_file = cert_file
2149            self.cert_pwd = cert_pwd
2150            self.trusted_certs = None
2151            self.caller = caller
2152
2153        def __call__(self, uri, aid, topo, attrs=None):
2154            req = {
2155                    'allocID': { 'fedid' : aid }, 
2156                    'segmentdescription': { 
2157                        'topdldescription': topo.to_dict(),
2158                    },
2159                }
2160            if attrs:
2161                req['fedAttr'] = attrs
2162
2163            r = self.caller(uri, req, self.cert_file, self.cert_pwd,
2164                    self.trusted_certs)
2165            print r
2166            return True
2167
2168
2169   
2170
2171    def new_allocate_resources(self, allocated, master, eid, expid, expcert, 
2172            tbparams, topo, tmpdir, alloc_log=None, attrs=None):
2173        started = { }           # Testbeds where a sub-experiment started
2174                                # successfully
2175
2176        # XXX
2177        fail_soft = False
2178
2179        log = alloc_log or self.log
2180
2181        thread_pool = self.thread_pool(self.nthreads)
2182        threads = [ ]
2183
2184        for tb in [ k for k in allocated.keys() if k != master]:
2185            # Create and start a thread to start the segment, and save it to
2186            # get the return value later
2187            thread_pool.wait_for_slot()
2188            uri = self.tbmap.get(tb, None)
2189            if not uri:
2190                raise service_error(service_error.internal, 
2191                        "Unknown testbed %s !?" % tb)
2192
2193            if tbparams[tb].has_key('allocID') and \
2194                    tbparams[tb]['allocID'].has_key('fedid'):
2195                aid = tbparams[tb]['allocID']['fedid']
2196            else:
2197                raise service_error(service_error.internal, 
2198                        "No alloc id for testbed %s !?" % tb)
2199
2200            t  = self.pooled_thread(\
2201                    target=self.new_start_segment(log=log, debug=self.debug,
2202                        cert_file=self.cert_file, cert_pwd=self.cert_pwd,
2203                        trusted_certs=self.trusted_certs,
2204                        caller=self.call_StartSegment), 
2205                    args=(uri, aid, topo[tb], attrs), name=tb,
2206                    pdata=thread_pool, trace_file=self.trace_file)
2207            threads.append(t)
2208            t.start()
2209
2210        # Wait until all finish
2211        thread_pool.wait_for_all_done()
2212
2213        # If none failed, start the master
2214        failed = [ t.getName() for t in threads if not t.rv ]
2215
2216        if len(failed) == 0:
2217            uri = self.tbmap.get(master, None)
2218            if not uri:
2219                raise service_error(service_error.internal, 
2220                        "Unknown testbed %s !?" % master)
2221
2222            if tbparams[master].has_key('allocID') and \
2223                    tbparams[master]['allocID'].has_key('fedid'):
2224                aid = tbparams[master]['allocID']['fedid']
2225            else:
2226                raise service_error(service_error.internal, 
2227                    "No alloc id for testbed %s !?" % master)
2228            starter = self.new_start_segment(log=log, debug=self.debug,
2229                    cert_file=self.cert_file, cert_pwd=self.cert_pwd,
2230                    trusted_certs=self.trusted_certs,
2231                    caller=self.call_StartSegment)
2232            if not starter(uri, aid, topo[master]):
2233                failed.append(master)
2234
2235        succeeded = [tb for tb in allocated.keys() if tb not in failed]
2236        # If one failed clean up, unless fail_soft is set
2237        if failed and False:
2238            if not fail_soft:
2239                thread_pool.clear()
2240                for tb in succeeded:
2241                    # Create and start a thread to stop the segment
2242                    thread_pool.wait_for_slot()
2243                    t  = self.pooled_thread(\
2244                            target=self.stop_segment(log=log,
2245                                keyfile=self.ssh_privkey_file,
2246                                debug=self.debug), 
2247                            args=(tb, eid, tbparams), name=tb,
2248                            pdata=thread_pool, trace_file=self.trace_file)
2249                    t.start()
2250                # Wait until all finish
2251                thread_pool.wait_for_all_done()
2252
2253                # release the allocations
2254                for tb in tbparams.keys():
2255                    self.release_access(tb, tbparams[tb]['allocID'])
2256                # Remove the placeholder
2257                self.state_lock.acquire()
2258                self.state[eid]['experimentStatus'] = 'failed'
2259                if self.state_filename: self.write_state()
2260                self.state_lock.release()
2261
2262                log.error("Swap in failed on %s" % ",".join(failed))
2263                return
2264        else:
2265            log.info("[start_segment]: Experiment %s active" % eid)
2266
2267        log.debug("[start_experiment]: removing %s" % tmpdir)
2268
2269        # Walk up tmpdir, deleting as we go
2270        for path, dirs, files in os.walk(tmpdir, topdown=False):
2271            for f in files:
2272                os.remove(os.path.join(path, f))
2273            for d in dirs:
2274                os.rmdir(os.path.join(path, d))
2275        os.rmdir(tmpdir)
2276
2277        # Insert the experiment into our state and update the disk copy
2278        self.state_lock.acquire()
2279        self.state[expid]['experimentStatus'] = 'active'
2280        self.state[eid] = self.state[expid]
2281        if self.state_filename: self.write_state()
2282        self.state_lock.release()
2283        return
2284
2285
2286    def new_create_experiment(self, req, fid):
2287        """
2288        The external interface to experiment creation called from the
2289        dispatcher.
2290
2291        Creates a working directory, splits the incoming description using the
2292        splitter script and parses out the avrious subsections using the
2293        lcasses above.  Once each sub-experiment is created, use pooled threads
2294        to instantiate them and start it all up.
2295        """
2296
2297        def add_kit(e, kit):
2298            """
2299            Add a Software object created from the list of (install, location)
2300            tuples passed as kit  to the software attribute of an object e.  We
2301            do this enough to break out the code, but it's kind of a hack to
2302            avoid changing the old tuple rep.
2303            """
2304
2305            s = [ topdl.Software(install=i, location=l) for i, l in kit]
2306
2307            if isinstance(e.software, list): e.software.extend(s)
2308            else: e.software = s
2309
2310
2311        if not self.auth.check_attribute(fid, 'create'):
2312            raise service_error(service_error.access, "Create access denied")
2313
2314        try:
2315            tmpdir = tempfile.mkdtemp(prefix="split-")
2316        except IOError:
2317            raise service_error(service_error.internal, "Cannot create tmp dir")
2318
2319        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2320        gw_secretkey_base = "fed.%s" % self.ssh_type
2321        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2322        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2323        tclfile = tmpdir + "/experiment.tcl"
2324        tbparams = { }
2325        try:
2326            access_user = self.accessdb[fid]
2327        except KeyError:
2328            raise service_error(service_error.internal,
2329                    "Access map and authorizer out of sync in " + \
2330                            "create_experiment for fedid %s"  % fid)
2331
2332        pid = "dummy"
2333        gid = "dummy"
2334        try:
2335            os.mkdir(tmpdir+"/keys")
2336        except OSError:
2337            raise service_error(service_error.internal,
2338                    "Can't make temporary dir")
2339
2340        req = req.get('CreateRequestBody', None)
2341        if not req:
2342            raise service_error(service_error.req,
2343                    "Bad request format (no CreateRequestBody)")
2344        # The tcl parser needs to read a file so put the content into that file
2345        descr=req.get('experimentdescription', None)
2346        if descr:
2347            file_content=descr.get('ns2description', None)
2348            if file_content:
2349                try:
2350                    f = open(tclfile, 'w')
2351                    f.write(file_content)
2352                    f.close()
2353                except IOError:
2354                    raise service_error(service_error.internal,
2355                            "Cannot write temp experiment description")
2356            else:
2357                raise service_error(service_error.req, 
2358                        "Only ns2descriptions supported")
2359        else:
2360            raise service_error(service_error.req, "No experiment description")
2361
2362        # Generate an ID for the experiment (slice) and a certificate that the
2363        # allocator can use to prove they own it.  We'll ship it back through
2364        # the encrypted connection.
2365        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
2366
2367        if req.has_key('experimentID') and \
2368                req['experimentID'].has_key('localname'):
2369            overwrite = False
2370            eid = req['experimentID']['localname']
2371            # If there's an old failed experiment here with the same local name
2372            # and accessible by this user, we'll overwrite it, otherwise we'll
2373            # fall through and do the collision avoidance.
2374            old_expid = self.get_experiment_fedid(eid)
2375            if old_expid and self.check_experiment_access(fid, old_expid):
2376                self.state_lock.acquire()
2377                status = self.state[eid].get('experimentStatus', None)
2378                if status and status == 'failed':
2379                    # remove the old access attribute
2380                    self.auth.unset_attribute(fid, old_expid)
2381                    overwrite = True
2382                    del self.state[eid]
2383                    del self.state[old_expid]
2384                self.state_lock.release()
2385            self.state_lock.acquire()
2386            while (self.state.has_key(eid) and not overwrite):
2387                eid += random.choice(string.ascii_letters)
2388            # Initial state
2389            self.state[eid] = {
2390                    'experimentID' : \
2391                            [ { 'localname' : eid }, {'fedid': expid } ],
2392                    'experimentStatus': 'starting',
2393                    'experimentAccess': { 'X509' : expcert },
2394                    'owner': fid,
2395                    'log' : [],
2396                }
2397            self.state[expid] = self.state[eid]
2398            if self.state_filename: self.write_state()
2399            self.state_lock.release()
2400        else:
2401            eid = self.exp_stem
2402            for i in range(0,5):
2403                eid += random.choice(string.ascii_letters)
2404            self.state_lock.acquire()
2405            while (self.state.has_key(eid)):
2406                eid = self.exp_stem
2407                for i in range(0,5):
2408                    eid += random.choice(string.ascii_letters)
2409            # Initial state
2410            self.state[eid] = {
2411                    'experimentID' : \
2412                            [ { 'localname' : eid }, {'fedid': expid } ],
2413                    'experimentStatus': 'starting',
2414                    'experimentAccess': { 'X509' : expcert },
2415                    'owner': fid,
2416                    'log' : [],
2417                }
2418            self.state[expid] = self.state[eid]
2419            if self.state_filename: self.write_state()
2420            self.state_lock.release()
2421
2422        try: 
2423            # This catches exceptions to clear the placeholder if necessary
2424            try:
2425                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2426            except ValueError:
2427                raise service_error(service_error.server_config, 
2428                        "Bad key type (%s)" % self.ssh_type)
2429
2430            user = req.get('user', None)
2431            if user == None:
2432                raise service_error(service_error.req, "No user")
2433
2434            master = req.get('master', None)
2435            if not master:
2436                raise service_error(service_error.req,
2437                        "No master testbed label")
2438            export_project = req.get('exportProject', None)
2439            if not export_project:
2440                raise service_error(service_error.req, "No export project")
2441           
2442            if self.splitter_url:
2443                self.log.debug("Calling remote splitter at %s" % \
2444                        self.splitter_url)
2445                split_data = self.remote_splitter(self.splitter_url,
2446                        file_content, master)
2447            else:
2448                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2449                    str(self.muxmax), '-m', master]
2450
2451                if self.fedkit:
2452                    tclcmd.append('-k')
2453
2454                if self.gatewaykit:
2455                    tclcmd.append('-K')
2456
2457                tclcmd.extend([pid, gid, eid, tclfile])
2458
2459                self.log.debug("running local splitter %s", " ".join(tclcmd))
2460                # This is just fantastic.  As a side effect the parser copies
2461                # tb_compat.tcl into the current directory, so that directory
2462                # must be writable by the fedd user.  Doing this in the
2463                # temporary subdir ensures this is the case.
2464                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2465                        cwd=tmpdir)
2466                split_data = tclparser.stdout
2467
2468            allocated = { }         # Testbeds we can access
2469            # Allocate IP addresses: The allocator is a buddy system memory
2470            # allocator.  Allocate from the largest substrate to the
2471            # smallest to make the packing more likely to work - i.e.
2472            # avoiding internal fragmentation.
2473            top = topdl.topology_from_xml(file=split_data, top="experiment")
2474            subs = sorted(top.substrates, 
2475                    cmp=lambda x,y: cmp(len(x.interfaces), 
2476                        len(y.interfaces)),
2477                    reverse=True)
2478            ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
2479            ifs = { }
2480            hosts = [ ]
2481            # The config urlpath
2482            configpath = "/%s/config" % expid
2483            # The config file system location
2484            configdir ="%s%s" % ( self.repodir, configpath)
2485
2486            for idx, s in enumerate(subs):
2487                a = ips.allocate(len(s.interfaces)+2)
2488                if a :
2489                    base, num = a
2490                    if num < len(s.interfaces) +2 : 
2491                        raise service_error(service_error.internal,
2492                                "Allocator returned wrong number of IPs??")
2493                else:
2494                    raise service_error(service_error.req, 
2495                            "Cannot allocate IP addresses")
2496
2497                base += 1
2498                for i in s.interfaces:
2499                    i.attribute.append(
2500                            topdl.Attribute('ip4_address', 
2501                                "%s" % ip_addr(base)))
2502                    hname = i.element.name[0]
2503                    if ifs.has_key(hname):
2504                        hosts.append("%s\t%s-%s %s-%d" % \
2505                                (ip_addr(base), hname, s.name, hname,
2506                                    ifs[hname]))
2507                    else:
2508                        ifs[hname] = 0
2509                        hosts.append("%s\t%s-%s %s-%d %s" % \
2510                                (ip_addr(base), hname, s.name, hname,
2511                                    ifs[hname], hname))
2512
2513                    ifs[hname] += 1
2514                    base += 1
2515            # save config files
2516            try:
2517                os.makedirs(configdir)
2518            except IOError, e:
2519                raise service_error(
2520                        "Cannot create config directory: %s" % e)
2521            # Find the testbeds to look up
2522            testbeds = set([ a.value for e in top.elements \
2523                    for a in e.attribute \
2524                        if a.attribute == 'testbed'] )
2525
2526
2527            # Make per testbed topologies.  Copy the main topo and remove
2528            # interfaces and nodes that don't live in the testbed.
2529            topo ={ }
2530            for tb in testbeds:
2531                self.get_access(tb, None, user, tbparams, master,
2532                        export_project, access_user)
2533                allocated[tb] = 1
2534                topo[tb] = top.clone()
2535                to_delete = [ ]
2536                for e in topo[tb].elements:
2537                    etb = e.get_attribute('testbed')
2538                    if etb and etb != tb:
2539                        for i in e.interface:
2540                            for s in i.subs:
2541                                try:
2542                                    s.interfaces.remove(i)
2543                                except ValueError:
2544                                    raise service_error(service_error.internal,
2545                                            "Can't remove interface??")
2546                        to_delete.append(e)
2547                for e in to_delete:
2548                    topo[tb].elements.remove(e)
2549                topo[tb].make_indices()
2550
2551                for e in topo[tb].elements:
2552                    if tb == master:
2553                        cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
2554                    else:
2555                        cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
2556                    scmd = e.get_attribute('startup')
2557                    if scmd:
2558                        cmd = "%s \\$USER '%s'" % (cmd, scmd)
2559
2560                    e.set_attribute('startup', cmd)
2561                    if self.fedkit: add_kit(e, self.fedkit)
2562
2563            # Copy configuration files into the remote file store
2564            try:
2565                f = open("%s/hosts" % configdir, "w")
2566                f.write('\n'.join(hosts))
2567                f.close()
2568            except IOError, e:
2569                raise service_error(service_error.internal, 
2570                        "Cannot write hosts file: %s" % e)
2571            try:
2572                self.copy_file("%s" % gw_pubkey, "%s/%s" % \
2573                        (configdir, gw_pubkey_base))
2574                self.copy_file("%s" % gw_secretkey, "%s/%s" % \
2575                        (configdir, gw_secretkey_base))
2576            except IOError, e:
2577                raise service_error(service_error.internal, 
2578                        "Cannot copy keyfiles: %s" % e)
2579
2580            # Allow the individual testbeds to access the configuration files.
2581            for tb in tbparams.keys():
2582                asignee = tbparams[tb]['allocID']['fedid']
2583                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2584                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2585                    print "assigned %s/%s" % (configpath, f)
2586
2587            # Now, for each substrate in the main topology, find those that
2588            # have nodes on more than one testbed.  Insert portal nodes
2589            # into the copies of those substrates on the sub topologies.
2590            for s in top.substrates:
2591                # tbs will contain an ip address on this subsrate that is in
2592                # each testbed.
2593                tbs = { }
2594                for i in s.interfaces:
2595                    e = i.element
2596                    tb = e.get_attribute('testbed')
2597                    if tb and not tbs.has_key(tb):
2598                        for i in e.interface:
2599                            if s in i.subs:
2600                                tbs[tb]= i.get_attribute('ip4_address')
2601                if len(tbs) < 2:
2602                    continue
2603
2604                # More than one testbed is on this substrate.  Insert
2605                # some portals into the subtopologies.  st == source testbed,
2606                # dt == destination testbed.
2607                segment_substrate = { }
2608                for st in tbs.keys():
2609                    segment_substrate[st] = { }
2610                    for dt in [ t for t in tbs.keys() if t != st]:
2611                        myname =  "%stunnel" % dt
2612                        desthost  =  "%stunnel" % st
2613                        sproject = tbparams[st].get('project', 'project')
2614                        dproject = tbparams[dt].get('project', 'project')
2615                        sdomain = ".%s.%s%s" % (eid, sproject,
2616                                tbparams[st].get('domain', ".example.com"))
2617                        ddomain = ".%s.%s%s" % (eid, dproject,
2618                                tbparams[dt].get('domain', ".example.com"))
2619                        mdomain = "%s.%s%s" % (eid, 
2620                                tbparams[master].get('project', 'project'),
2621                                tbparams[master].get('domain', '.example.com'))
2622                        # XXX: active and type need to be unkludged
2623                        active = ("%s" % (st == master))
2624                        if not segment_substrate[st].has_key(dt):
2625                            # Put a substrate and a segment for the connected
2626                            # testbed in there.
2627                            tsubstrate = \
2628                                    topdl.Substrate(name='%s-%s' % (st, dt))
2629                            segment_element = topdl.Segment(
2630                                    id= tbparams[dt]['allocID'],
2631                                    type='emulab',
2632                                    uri = self.tbmap.get(dt, None),
2633                                    interface=[ 
2634                                        topdl.Interface(
2635                                            substrate=tsubstrate.name),
2636                                        ],
2637                                    attribute = [
2638                                        topdl.Attribute(attribute=n, value=v)
2639                                            for n, v in (\
2640                                                ('domain', ddomain),
2641                                                ('experiment', "%s/%s" % \
2642                                                        (dproject, eid)),)
2643                                        ],
2644                                    )
2645                            segment_substrate[st][dt] = tsubstrate
2646                            topo[st].substrates.append(tsubstrate)
2647                            topo[st].elements.append(segment_element)
2648                        portal = topdl.Computer(
2649                                name="%stunnel" % dt,
2650                                attribute=[ 
2651                                    topdl.Attribute(attribute=n,value=v)
2652                                        for n, v in (\
2653                                            ('portal', 'true'),
2654                                            ('masterdomain', mdomain),
2655                                            ('experiment', "%s/%s" % \
2656                                                    (sproject, eid)),
2657                                            ('peer', "%s.%s" % \
2658                                                    (desthost, ddomain)),
2659                                            ('scriptdir', 
2660                                                "/usr/local/federation/bin"),
2661                                            ('active', "%s" % active),
2662                                            ('type', 'both'), 
2663                                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), sdomain.lower())))
2664                                    ],
2665                                interface=[
2666                                    topdl.Interface(
2667                                        substrate=s.name,
2668                                        attribute=[ 
2669                                            topdl.Attribute(
2670                                                attribute='ip4_addreess', 
2671                                                value=tbs[dt]
2672                                            )
2673                                        ]),
2674                                    topdl.Interface(
2675                                        substrate=\
2676                                            segment_substrate[st][dt].name
2677                                        ),
2678                                    ],
2679                                )
2680                        if self.fedkit: add_kit(portal, self.fedkit)
2681                        if self.gatewaykit: add_kit(portal, self.gatewaykit)
2682
2683                        topo[st].elements.append(portal)
2684
2685            # Connect the gateway nodes into the topologies and clear out
2686            # substrates that are not in the topologies
2687            for tb in testbeds:
2688                topo[tb].incorporate_elements()
2689                topo[tb].substrates = \
2690                        [s for s in topo[tb].substrates \
2691                            if len(s.interfaces) >0]
2692
2693            # Copy the rpms and tarfiles to a distribution directory from
2694            # which the federants can retrieve them
2695            linkpath = "%s/software" %  expid
2696            softdir ="%s/%s" % ( self.repodir, linkpath)
2697            softmap = { }
2698            pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
2699                    for p, t in l ])
2700            pkgs.update([x.location for e in top.elements \
2701                    for x in e.software])
2702            try:
2703                os.makedirs(softdir)
2704            except IOError, e:
2705                raise service_error(
2706                        "Cannot create software directory: %s" % e)
2707            for pkg in pkgs:
2708                loc = pkg
2709
2710                scheme, host, path = urlparse(loc)[0:3]
2711                dest = os.path.basename(path)
2712                if not scheme:
2713                    if not loc.startswith('/'):
2714                        loc = "/%s" % loc
2715                    loc = "file://%s" %loc
2716                try:
2717                    u = urlopen(loc)
2718                except Exception, e:
2719                    raise service_error(service_error.req, 
2720                            "Cannot open %s: %s" % (loc, e))
2721                try:
2722                    f = open("%s/%s" % (softdir, dest) , "w")
2723                    self.log.debug("Writing %s/%s" % (softdir,dest) )
2724                    data = u.read(4096)
2725                    while data:
2726                        f.write(data)
2727                        data = u.read(4096)
2728                    f.close()
2729                    u.close()
2730                except Exception, e:
2731                    raise service_error(service_error.internal,
2732                            "Could not copy %s: %s" % (loc, e))
2733                path = re.sub("/tmp", "", linkpath)
2734                # XXX
2735                softmap[pkg] = \
2736                        "https://users.isi.deterlab.net:23232/%s/%s" %\
2737                        ( path, dest)
2738
2739                # Allow the individual testbeds to access the software.
2740                for tb in tbparams.keys():
2741                    self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
2742                            "/%s/%s" % ( path, dest))
2743
2744            # Convert the software locations in the segments into the local
2745            # copies on this host
2746            for soft in [ s for tb in topo.values() \
2747                    for e in tb.elements \
2748                        if getattr(e, 'software', False) \
2749                            for s in e.software ]:
2750                if softmap.has_key(soft.location):
2751                    soft.location = softmap[soft.location]
2752
2753            vtopo = topdl.topology_to_vtopo(top)
2754            vis = self.genviz(vtopo)
2755
2756            # save federant information
2757            for k in allocated.keys():
2758                tbparams[k]['federant'] = {\
2759                        'name': [ { 'localname' : eid} ],\
2760                        'emulab': tbparams[k]['emulab'],\
2761                        'allocID' : tbparams[k]['allocID'],\
2762                        'master' : k == master,\
2763                    }
2764
2765            self.state_lock.acquire()
2766            self.state[eid]['vtopo'] = vtopo
2767            self.state[eid]['vis'] = vis
2768            self.state[expid]['federant'] = \
2769                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2770                        if tbparams[tb].has_key('federant') ]
2771            if self.state_filename: 
2772                self.write_state()
2773            self.state_lock.release()
2774        except service_error, e:
2775            # If something goes wrong in the parse (usually an access error)
2776            # clear the placeholder state.  From here on out the code delays
2777            # exceptions.  Failing at this point returns a fault to the remote
2778            # caller.
2779
2780            self.state_lock.acquire()
2781            del self.state[eid]
2782            del self.state[expid]
2783            if self.state_filename: self.write_state()
2784            self.state_lock.release()
2785            raise e
2786
2787
2788        # Start the background swapper and return the starting state.  From
2789        # here on out, the state will stick around a while.
2790
2791        # Let users touch the state
2792        self.auth.set_attribute(fid, expid)
2793        self.auth.set_attribute(expid, expid)
2794        # Override fedids can manipulate state as well
2795        for o in self.overrides:
2796            self.auth.set_attribute(o, expid)
2797
2798        # Create a logger that logs to the experiment's state object as well as
2799        # to the main log file.
2800        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2801        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2802        # XXX: there should be a global one of these rather than repeating the
2803        # code.
2804        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2805                    '%d %b %y %H:%M:%S'))
2806        alloc_log.addHandler(h)
2807       
2808        # XXX
2809        url_base = 'https://users.isi.deterlab.net:23232'
2810        attrs = [ 
2811                {
2812                    'attribute': 'ssh_pubkey', 
2813                    'value': '%s/%s/config/%s' % \
2814                            (url_base, expid, gw_pubkey_base)
2815                },
2816                {
2817                    'attribute': 'ssh_secretkey', 
2818                    'value': '%s/%s/config/%s' % \
2819                            (url_base, expid, gw_secretkey_base)
2820                },
2821                {
2822                    'attribute': 'hosts', 
2823                    'value': '%s/%s/config/hosts' % \
2824                            (url_base, expid)
2825                },
2826            ]
2827
2828        # Start a thread to do the resource allocation
2829        t  = Thread(target=self.new_allocate_resources,
2830                args=(allocated, master, eid, expid, expcert, tbparams, 
2831                    topo, tmpdir, alloc_log, attrs),
2832                name=eid)
2833        t.start()
2834
2835        rv = {
2836                'experimentID': [
2837                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2838                ],
2839                'experimentStatus': 'starting',
2840                'experimentAccess': { 'X509' : expcert }
2841            }
2842
2843        return rv
2844   
2845    def get_experiment_fedid(self, key):
2846        """
2847        find the fedid associated with the localname key in the state database.
2848        """
2849
2850        rv = None
2851        self.state_lock.acquire()
2852        if self.state.has_key(key):
2853            if isinstance(self.state[key], dict):
2854                try:
2855                    kl = [ f['fedid'] for f in \
2856                            self.state[key]['experimentID']\
2857                                if f.has_key('fedid') ]
2858                except KeyError:
2859                    self.state_lock.release()
2860                    raise service_error(service_error.internal, 
2861                            "No fedid for experiment %s when getting "+\
2862                                    "fedid(!?)" % key)
2863                if len(kl) == 1:
2864                    rv = kl[0]
2865                else:
2866                    self.state_lock.release()
2867                    raise service_error(service_error.internal, 
2868                            "multiple fedids for experiment %s when " +\
2869                                    "getting fedid(!?)" % key)
2870            else:
2871                self.state_lock.release()
2872                raise service_error(service_error.internal, 
2873                        "Unexpected state for %s" % key)
2874        self.state_lock.release()
2875        return rv
2876
2877    def check_experiment_access(self, fid, key):
2878        """
2879        Confirm that the fid has access to the experiment.  Though a request
2880        may be made in terms of a local name, the access attribute is always
2881        the experiment's fedid.
2882        """
2883        if not isinstance(key, fedid):
2884            key = self.get_experiment_fedid(key)
2885
2886        if self.auth.check_attribute(fid, key):
2887            return True
2888        else:
2889            raise service_error(service_error.access, "Access Denied")
2890
2891
2892    def get_handler(self, path, fid):
2893        print "%s" %  path
2894        if self.auth.check_attribute(fid, path):
2895            return ("%s/%s" % (self.repodir, path), "application/binary")
2896        else:
2897            return (None, None)
2898
2899    def get_vtopo(self, req, fid):
2900        """
2901        Return the stored virtual topology for this experiment
2902        """
2903        rv = None
2904        state = None
2905
2906        req = req.get('VtopoRequestBody', None)
2907        if not req:
2908            raise service_error(service_error.req,
2909                    "Bad request format (no VtopoRequestBody)")
2910        exp = req.get('experiment', None)
2911        if exp:
2912            if exp.has_key('fedid'):
2913                key = exp['fedid']
2914                keytype = "fedid"
2915            elif exp.has_key('localname'):
2916                key = exp['localname']
2917                keytype = "localname"
2918            else:
2919                raise service_error(service_error.req, "Unknown lookup type")
2920        else:
2921            raise service_error(service_error.req, "No request?")
2922
2923        self.check_experiment_access(fid, key)
2924
2925        self.state_lock.acquire()
2926        if self.state.has_key(key):
2927            if self.state[key].has_key('vtopo'):
2928                rv = { 'experiment' : {keytype: key },\
2929                        'vtopo': self.state[key]['vtopo'],\
2930                    }
2931            else:
2932                state = self.state[key]['experimentStatus']
2933        self.state_lock.release()
2934
2935        if rv: return rv
2936        else: 
2937            if state:
2938                raise service_error(service_error.partial, 
2939                        "Not ready: %s" % state)
2940            else:
2941                raise service_error(service_error.req, "No such experiment")
2942
2943    def get_vis(self, req, fid):
2944        """
2945        Return the stored visualization for this experiment
2946        """
2947        rv = None
2948        state = None
2949
2950        req = req.get('VisRequestBody', None)
2951        if not req:
2952            raise service_error(service_error.req,
2953                    "Bad request format (no VisRequestBody)")
2954        exp = req.get('experiment', None)
2955        if exp:
2956            if exp.has_key('fedid'):
2957                key = exp['fedid']
2958                keytype = "fedid"
2959            elif exp.has_key('localname'):
2960                key = exp['localname']
2961                keytype = "localname"
2962            else:
2963                raise service_error(service_error.req, "Unknown lookup type")
2964        else:
2965            raise service_error(service_error.req, "No request?")
2966
2967        self.check_experiment_access(fid, key)
2968
2969        self.state_lock.acquire()
2970        if self.state.has_key(key):
2971            if self.state[key].has_key('vis'):
2972                rv =  { 'experiment' : {keytype: key },\
2973                        'vis': self.state[key]['vis'],\
2974                        }
2975            else:
2976                state = self.state[key]['experimentStatus']
2977        self.state_lock.release()
2978
2979        if rv: return rv
2980        else:
2981            if state:
2982                raise service_error(service_error.partial, 
2983                        "Not ready: %s" % state)
2984            else:
2985                raise service_error(service_error.req, "No such experiment")
2986
2987    def clean_info_response(self, rv):
2988        """
2989        Remove the information in the experiment's state object that is not in
2990        the info response.
2991        """
2992        # Remove the owner info (should always be there, but...)
2993        if rv.has_key('owner'): del rv['owner']
2994
2995        # Convert the log into the allocationLog parameter and remove the
2996        # log entry (with defensive programming)
2997        if rv.has_key('log'):
2998            rv['allocationLog'] = "".join(rv['log'])
2999            del rv['log']
3000        else:
3001            rv['allocationLog'] = ""
3002
3003        if rv['experimentStatus'] != 'active':
3004            if rv.has_key('federant'): del rv['federant']
3005        else:
3006            # remove the allocationID info from each federant
3007            for f in rv.get('federant', []):
3008                if f.has_key('allocID'): del f['allocID']
3009        return rv
3010
3011    def get_info(self, req, fid):
3012        """
3013        Return all the stored info about this experiment
3014        """
3015        rv = None
3016
3017        req = req.get('InfoRequestBody', None)
3018        if not req:
3019            raise service_error(service_error.req,
3020                    "Bad request format (no InfoRequestBody)")
3021        exp = req.get('experiment', None)
3022        if exp:
3023            if exp.has_key('fedid'):
3024                key = exp['fedid']
3025                keytype = "fedid"
3026            elif exp.has_key('localname'):
3027                key = exp['localname']
3028                keytype = "localname"
3029            else:
3030                raise service_error(service_error.req, "Unknown lookup type")
3031        else:
3032            raise service_error(service_error.req, "No request?")
3033
3034        self.check_experiment_access(fid, key)
3035
3036        # The state may be massaged by the service function that called
3037        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
3038        # state.
3039        self.state_lock.acquire()
3040        if self.state.has_key(key):
3041            rv = copy.deepcopy(self.state[key])
3042        self.state_lock.release()
3043
3044        if rv:
3045            return self.clean_info_response(rv)
3046        else:
3047            raise service_error(service_error.req, "No such experiment")
3048
3049    def get_multi_info(self, req, fid):
3050        """
3051        Return all the stored info that this fedid can access
3052        """
3053        rv = { 'info': [ ] }
3054
3055        self.state_lock.acquire()
3056        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
3057            self.check_experiment_access(fid, key)
3058
3059            if self.state.has_key(key):
3060                e = copy.deepcopy(self.state[key])
3061                e = self.clean_info_response(e)
3062                rv['info'].append(e)
3063        self.state_lock.release()
3064        return rv
3065
3066
3067    def terminate_experiment(self, req, fid):
3068        """
3069        Swap this experiment out on the federants and delete the shared
3070        information
3071        """
3072        tbparams = { }
3073        req = req.get('TerminateRequestBody', None)
3074        if not req:
3075            raise service_error(service_error.req,
3076                    "Bad request format (no TerminateRequestBody)")
3077        force = req.get('force', False)
3078        exp = req.get('experiment', None)
3079        if exp:
3080            if exp.has_key('fedid'):
3081                key = exp['fedid']
3082                keytype = "fedid"
3083            elif exp.has_key('localname'):
3084                key = exp['localname']
3085                keytype = "localname"
3086            else:
3087                raise service_error(service_error.req, "Unknown lookup type")
3088        else:
3089            raise service_error(service_error.req, "No request?")
3090
3091        self.check_experiment_access(fid, key)
3092
3093        dealloc_list = [ ]
3094
3095
3096        # Create a logger that logs to the dealloc_list as well as to the main
3097        # log file.
3098        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
3099        h = logging.StreamHandler(self.list_log(dealloc_list))
3100        # XXX: there should be a global one of these rather than repeating the
3101        # code.
3102        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
3103                    '%d %b %y %H:%M:%S'))
3104        dealloc_log.addHandler(h)
3105
3106        self.state_lock.acquire()
3107        fed_exp = self.state.get(key, None)
3108
3109        if fed_exp:
3110            # This branch of the conditional holds the lock to generate a
3111            # consistent temporary tbparams variable to deallocate experiments.
3112            # It releases the lock to do the deallocations and reacquires it to
3113            # remove the experiment state when the termination is complete.
3114
3115            # First make sure that the experiment creation is complete.
3116            status = fed_exp.get('experimentStatus', None)
3117
3118            if status:
3119                if status in ('starting', 'terminating'):
3120                    if not force:
3121                        self.state_lock.release()
3122                        raise service_error(service_error.partial, 
3123                                'Experiment still being created or destroyed')
3124                    else:
3125                        self.log.warning('Experiment in %s state ' % status + \
3126                                'being terminated by force.')
3127            else:
3128                # No status??? trouble
3129                self.state_lock.release()
3130                raise service_error(service_error.internal,
3131                        "Experiment has no status!?")
3132
3133            ids = []
3134            #  experimentID is a list of dicts that are self-describing
3135            #  identifiers.  This finds all the fedids and localnames - the
3136            #  keys of self.state - and puts them into ids.
3137            for id in fed_exp.get('experimentID', []):
3138                if id.has_key('fedid'): ids.append(id['fedid'])
3139                if id.has_key('localname'): ids.append(id['localname'])
3140
3141            # Construct enough of the tbparams to make the stop_segment calls
3142            # work
3143            for fed in fed_exp.get('federant', []):
3144                try:
3145                    for e in fed['name']:
3146                        eid = e.get('localname', None)
3147                        if eid: break
3148                    else:
3149                        continue
3150
3151                    p = fed['emulab']['project']
3152
3153                    project = p['name']['localname']
3154                    tb = p['testbed']['localname']
3155                    user = p['user'][0]['userID']['localname']
3156
3157                    domain = fed['emulab']['domain']
3158                    host  = fed['emulab']['ops']
3159                    aid = fed['allocID']
3160                except KeyError, e:
3161                    continue
3162                tbparams[tb] = {\
3163                        'user': user,\
3164                        'domain': domain,\
3165                        'project': project,\
3166                        'host': host,\
3167                        'eid': eid,\
3168                        'aid': aid,\
3169                    }
3170            fed_exp['experimentStatus'] = 'terminating'
3171            if self.state_filename: self.write_state()
3172            self.state_lock.release()
3173
3174            # Stop everyone.  NB, wait_for_all waits until a thread starts and
3175            # then completes, so we can't wait if nothing starts.  So, no
3176            # tbparams, no start.
3177            if len(tbparams) > 0:
3178                thread_pool = self.thread_pool(self.nthreads)
3179                for tb in tbparams.keys():
3180                    # Create and start a thread to stop the segment
3181                    thread_pool.wait_for_slot()
3182                    t  = self.pooled_thread(\
3183                            target=self.stop_segment(log=dealloc_log,
3184                                keyfile=self.ssh_privkey_file, debug=self.debug), 
3185                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
3186                            pdata=thread_pool, trace_file=self.trace_file)
3187                    t.start()
3188                # Wait for completions
3189                thread_pool.wait_for_all_done()
3190
3191            # release the allocations (failed experiments have done this
3192            # already, and starting experiments may be in odd states, so we
3193            # ignore errors releasing those allocations
3194            try: 
3195                for tb in tbparams.keys():
3196                    self.release_access(tb, tbparams[tb]['aid'])
3197            except service_error, e:
3198                if status != 'failed' and not force:
3199                    raise e
3200
3201            # Remove the terminated experiment
3202            self.state_lock.acquire()
3203            for id in ids:
3204                if self.state.has_key(id): del self.state[id]
3205
3206            if self.state_filename: self.write_state()
3207            self.state_lock.release()
3208
3209            return { 
3210                    'experiment': exp , 
3211                    'deallocationLog': "".join(dealloc_list),
3212                    }
3213        else:
3214            # Don't forget to release the lock
3215            self.state_lock.release()
3216            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.