source: fedd/federation/experiment_control.py @ ecca6eb

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since ecca6eb was ecca6eb, checked in by Ted Faber <faber@…>, 15 years ago

checkpoint

  • Property mode set to 100644
File size: 124.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32from ip_allocator import ip_allocator
33from ip_addr import ip_addr
34
35
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.experiment_control")
40fl.addHandler(nullHandler())
41
42class experiment_control_local:
43    """
44    Control of experiments that this system can directly access.
45
46    Includes experiment creation, termination and information dissemination.
47    Thred safe.
48    """
49
50    class ssh_cmd_timeout(RuntimeError): pass
51
52    class list_log:
53        """
54        Provide an interface that lets logger.StreamHandler s write to a list
55        of strings.
56        """
57        def __init__(self, l=[]):
58            """
59            Link to an existing list or just create a log
60            """
61            self.ll = l
62            self.lock = Lock()
63        def write(self, str):
64            """
65            Add the string to the log.  Lock for consistency.
66            """
67            self.lock.acquire()
68            self.ll.append(str)
69            self.lock.release()
70
71        def flush(self):
72            """
73            No-op that StreamHandlers expect
74            """
75            pass
76
77   
78    class thread_pool:
79        """
80        A class to keep track of a set of threads all invoked for the same
81        task.  Manages the mutual exclusion of the states.
82        """
83        def __init__(self, nthreads):
84            """
85            Start a pool.
86            """
87            self.changed = Condition()
88            self.started = 0
89            self.terminated = 0
90            self.nthreads = nthreads
91
92        def acquire(self):
93            """
94            Get the pool's lock.
95            """
96            self.changed.acquire()
97
98        def release(self):
99            """
100            Release the pool's lock.
101            """
102            self.changed.release()
103
104        def wait(self, timeout = None):
105            """
106            Wait for a pool thread to start or stop.
107            """
108            self.changed.wait(timeout)
109
110        def start(self):
111            """
112            Called by a pool thread to report starting.
113            """
114            self.changed.acquire()
115            self.started += 1
116            self.changed.notifyAll()
117            self.changed.release()
118
119        def terminate(self):
120            """
121            Called by a pool thread to report finishing.
122            """
123            self.changed.acquire()
124            self.terminated += 1
125            self.changed.notifyAll()
126            self.changed.release()
127
128        def clear(self):
129            """
130            Clear all pool data.
131            """
132            self.changed.acquire()
133            self.started = 0
134            self.terminated =0
135            self.changed.notifyAll()
136            self.changed.release()
137
138        def wait_for_slot(self):
139            """
140            Wait until we have a free slot to start another pooled thread
141            """
142            self.acquire()
143            while self.started - self.terminated >= self.nthreads:
144                self.wait()
145            self.release()
146
147        def wait_for_all_done(self):
148            """
149            Wait until all active threads finish (and at least one has started)
150            """
151            self.acquire()
152            while self.started == 0 or self.started > self.terminated:
153                self.wait()
154            self.release()
155
156    class pooled_thread(Thread):
157        """
158        One of a set of threads dedicated to a specific task.  Uses the
159        thread_pool class above for coordination.
160        """
161        def __init__(self, group=None, target=None, name=None, args=(), 
162                kwargs={}, pdata=None, trace_file=None):
163            Thread.__init__(self, group, target, name, args, kwargs)
164            self.rv = None          # Return value of the ops in this thread
165            self.exception = None   # Exception that terminated this thread
166            self.target=target      # Target function to run on start()
167            self.args = args        # Args to pass to target
168            self.kwargs = kwargs    # Additional kw args
169            self.pdata = pdata      # thread_pool for this class
170            # Logger for this thread
171            self.log = logging.getLogger("fedd.experiment_control")
172       
173        def run(self):
174            """
175            Emulate Thread.run, except add pool data manipulation and error
176            logging.
177            """
178            if self.pdata:
179                self.pdata.start()
180
181            if self.target:
182                try:
183                    self.rv = self.target(*self.args, **self.kwargs)
184                except service_error, s:
185                    self.exception = s
186                    self.log.error("Thread exception: %s %s" % \
187                            (s.code_string(), s.desc))
188                except:
189                    self.exception = sys.exc_info()[1]
190                    self.log.error(("Unexpected thread exception: %s" +\
191                            "Trace %s") % (self.exception,\
192                                traceback.format_exc()))
193            if self.pdata:
194                self.pdata.terminate()
195
196    call_RequestAccess = service_caller('RequestAccess')
197    call_ReleaseAccess = service_caller('ReleaseAccess')
198    call_StartSegment = service_caller('StartSegment')
199    call_Ns2Split = service_caller('Ns2Split')
200
201    def __init__(self, config=None, auth=None):
202        """
203        Intialize the various attributes, most from the config object
204        """
205
206        def parse_tarfile_list(tf):
207            """
208            Parse a tarfile list from the configuration.  This is a set of
209            paths and tarfiles separated by spaces.
210            """
211            rv = [ ]
212            if tf is not None:
213                tl = tf.split()
214                while len(tl) > 1:
215                    p, t = tl[0:2]
216                    del tl[0:2]
217                    rv.append((p, t))
218            return rv
219
220        self.thread_with_rv = experiment_control_local.pooled_thread
221        self.thread_pool = experiment_control_local.thread_pool
222        self.list_log = experiment_control_local.list_log
223
224        self.cert_file = config.get("experiment_control", "cert_file")
225        if self.cert_file:
226            self.cert_pwd = config.get("experiment_control", "cert_pwd")
227        else:
228            self.cert_file = config.get("globals", "cert_file")
229            self.cert_pwd = config.get("globals", "cert_pwd")
230
231        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
232                or config.get("globals", "trusted_certs")
233
234        self.repodir = config.get("experiment_control", "repodir")
235
236        self.exp_stem = "fed-stem"
237        self.log = logging.getLogger("fedd.experiment_control")
238        set_log_level(config, "experiment_control", self.log)
239        self.muxmax = 2
240        self.nthreads = 2
241        self.randomize_experiments = False
242
243        self.splitter = None
244        self.ssh_keygen = "/usr/bin/ssh-keygen"
245        self.ssh_identity_file = None
246
247
248        self.debug = config.getboolean("experiment_control", "create_debug")
249        self.state_filename = config.get("experiment_control", 
250                "experiment_state")
251        self.splitter_url = config.get("experiment_control", "splitter_uri")
252        self.fedkit = parse_tarfile_list(\
253                config.get("experiment_control", "fedkit"))
254        self.gatewaykit = parse_tarfile_list(\
255                config.get("experiment_control", "gatewaykit"))
256        accessdb_file = config.get("experiment_control", "accessdb")
257
258        self.ssh_pubkey_file = config.get("experiment_control", 
259                "ssh_pubkey_file")
260        self.ssh_privkey_file = config.get("experiment_control",
261                "ssh_privkey_file")
262        # NB for internal master/slave ops, not experiment setup
263        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
264
265        self.overrides = set([])
266        ovr = config.get('experiment_control', 'overrides')
267        if ovr:
268            for o in ovr.split(","):
269                o = o.strip()
270                if o.startswith('fedid:'): o = o[len('fedid:'):]
271                self.overrides.add(fedid(hexstr=o))
272
273        self.state = { }
274        self.state_lock = Lock()
275        self.tclsh = "/usr/local/bin/otclsh"
276        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
277                config.get("experiment_control", "tcl_splitter",
278                        "/usr/testbed/lib/ns2ir/parse.tcl")
279        mapdb_file = config.get("experiment_control", "mapdb")
280        self.trace_file = sys.stderr
281
282        self.def_expstart = \
283                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
284                "/tmp/federate";
285        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
286                "FEDDIR/hosts";
287        self.def_gwstart = \
288                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
289                "/tmp/bridge.log";
290        self.def_mgwstart = \
291                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
292                "/tmp/bridge.log";
293        self.def_gwimage = "FBSD61-TUNNEL2";
294        self.def_gwtype = "pc";
295        self.local_access = { }
296
297        if auth:
298            self.auth = auth
299        else:
300            self.log.error(\
301                    "[access]: No authorizer initialized, creating local one.")
302            auth = authorizer()
303
304
305        if self.ssh_pubkey_file:
306            try:
307                f = open(self.ssh_pubkey_file, 'r')
308                self.ssh_pubkey = f.read()
309                f.close()
310            except IOError:
311                raise service_error(service_error.internal,
312                        "Cannot read sshpubkey")
313        else:
314            raise service_error(service_error.internal, 
315                    "No SSH public key file?")
316
317        if not self.ssh_privkey_file:
318            raise service_error(service_error.internal, 
319                    "No SSH public key file?")
320
321
322        if mapdb_file:
323            self.read_mapdb(mapdb_file)
324        else:
325            self.log.warn("[experiment_control] No testbed map, using defaults")
326            self.tbmap = { 
327                    'deter':'https://users.isi.deterlab.net:23235',
328                    'emulab':'https://users.isi.deterlab.net:23236',
329                    'ucb':'https://users.isi.deterlab.net:23237',
330                    }
331
332        if accessdb_file:
333                self.read_accessdb(accessdb_file)
334        else:
335            raise service_error(service_error.internal,
336                    "No accessdb specified in config")
337
338        # Grab saved state.  OK to do this w/o locking because it's read only
339        # and only one thread should be in existence that can see self.state at
340        # this point.
341        if self.state_filename:
342            self.read_state()
343
344        # Dispatch tables
345        self.soap_services = {\
346                'Create': soap_handler('Create', self.new_create_experiment),
347                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
348                'Vis': soap_handler('Vis', self.get_vis),
349                'Info': soap_handler('Info', self.get_info),
350                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
351                'Terminate': soap_handler('Terminate', 
352                    self.terminate_experiment),
353        }
354
355        self.xmlrpc_services = {\
356                'Create': xmlrpc_handler('Create', self.new_create_experiment),
357                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
358                'Vis': xmlrpc_handler('Vis', self.get_vis),
359                'Info': xmlrpc_handler('Info', self.get_info),
360                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
361                'Terminate': xmlrpc_handler('Terminate',
362                    self.terminate_experiment),
363        }
364
365    def copy_file(self, src, dest, size=1024):
366        """
367        Exceedingly simple file copy.
368        """
369        s = open(src,'r')
370        d = open(dest, 'w')
371
372        buf = "x"
373        while buf != "":
374            buf = s.read(size)
375            d.write(buf)
376        s.close()
377        d.close()
378
379    # Call while holding self.state_lock
380    def write_state(self):
381        """
382        Write a new copy of experiment state after copying the existing state
383        to a backup.
384
385        State format is a simple pickling of the state dictionary.
386        """
387        if os.access(self.state_filename, os.W_OK):
388            self.copy_file(self.state_filename, \
389                    "%s.bak" % self.state_filename)
390        try:
391            f = open(self.state_filename, 'w')
392            pickle.dump(self.state, f)
393        except IOError, e:
394            self.log.error("Can't write file %s: %s" % \
395                    (self.state_filename, e))
396        except pickle.PicklingError, e:
397            self.log.error("Pickling problem: %s" % e)
398        except TypeError, e:
399            self.log.error("Pickling problem (TypeError): %s" % e)
400
401    # Call while holding self.state_lock
402    def read_state(self):
403        """
404        Read a new copy of experiment state.  Old state is overwritten.
405
406        State format is a simple pickling of the state dictionary.
407        """
408       
409        def get_experiment_id(state):
410            """
411            Pull the fedid experimentID out of the saved state.  This is kind
412            of a gross walk through the dict.
413            """
414
415            if state.has_key('experimentID'):
416                for e in state['experimentID']:
417                    if e.has_key('fedid'):
418                        return e['fedid']
419                else:
420                    return None
421            else:
422                return None
423
424        def get_alloc_ids(state):
425            """
426            Pull the fedids of the identifiers of each allocation from the
427            state.  Again, a dict dive that's best isolated.
428            """
429
430            return [ f['allocID']['fedid'] 
431                    for f in state.get('federant',[]) \
432                        if f.has_key('allocID') and \
433                            f['allocID'].has_key('fedid')]
434
435
436        try:
437            f = open(self.state_filename, "r")
438            self.state = pickle.load(f)
439            self.log.debug("[read_state]: Read state from %s" % \
440                    self.state_filename)
441        except IOError, e:
442            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
443                    % (self.state_filename, e))
444        except pickle.UnpicklingError, e:
445            self.log.warning(("[read_state]: No saved state: " + \
446                    "Unpickling failed: %s") % e)
447       
448        for s in self.state.values():
449            try:
450
451                eid = get_experiment_id(s)
452                if eid : 
453                    # Give the owner rights to the experiment
454                    self.auth.set_attribute(s['owner'], eid)
455                    # And holders of the eid as well
456                    self.auth.set_attribute(eid, eid)
457                    # allow overrides to control experiments as well
458                    for o in self.overrides:
459                        self.auth.set_attribute(o, eid)
460                    # Set permissions to allow reading of the software repo, if
461                    # any, as well.
462                    for a in get_alloc_ids(s):
463                        self.auth.set_attribute(a, 'repo/%s' % eid)
464                else:
465                    raise KeyError("No experiment id")
466            except KeyError, e:
467                self.log.warning("[read_state]: State ownership or identity " +\
468                        "misformatted in %s: %s" % (self.state_filename, e))
469
470
471    def read_accessdb(self, accessdb_file):
472        """
473        Read the mapping from fedids that can create experiments to their name
474        in the 3-level access namespace.  All will be asserted from this
475        testbed and can include the local username and porject that will be
476        asserted on their behalf by this fedd.  Each fedid is also added to the
477        authorization system with the "create" attribute.
478        """
479        self.accessdb = {}
480        # These are the regexps for parsing the db
481        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
482        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
483                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
484        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
485                "\s*->\s*(" + name_expr + ")\s*$")
486        lineno = 0
487
488        # Parse the mappings and store in self.authdb, a dict of
489        # fedid -> (proj, user)
490        try:
491            f = open(accessdb_file, "r")
492            for line in f:
493                lineno += 1
494                line = line.strip()
495                if len(line) == 0 or line.startswith('#'):
496                    continue
497                m = project_line.match(line)
498                if m:
499                    fid = fedid(hexstr=m.group(1))
500                    project, user = m.group(2,3)
501                    if not self.accessdb.has_key(fid):
502                        self.accessdb[fid] = []
503                    self.accessdb[fid].append((project, user))
504                    continue
505
506                m = user_line.match(line)
507                if m:
508                    fid = fedid(hexstr=m.group(1))
509                    project = None
510                    user = m.group(2)
511                    if not self.accessdb.has_key(fid):
512                        self.accessdb[fid] = []
513                    self.accessdb[fid].append((project, user))
514                    continue
515                self.log.warn("[experiment_control] Error parsing access " +\
516                        "db %s at line %d" %  (accessdb_file, lineno))
517        except IOError:
518            raise service_error(service_error.internal,
519                    "Error opening/reading %s as experiment " +\
520                            "control accessdb" %  accessdb_file)
521        f.close()
522
523        # Initialize the authorization attributes
524        for fid in self.accessdb.keys():
525            self.auth.set_attribute(fid, 'create')
526
527    def read_mapdb(self, file):
528        """
529        Read a simple colon separated list of mappings for the
530        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
531        """
532
533        self.tbmap = { }
534        lineno =0
535        try:
536            f = open(file, "r")
537            for line in f:
538                lineno += 1
539                line = line.strip()
540                if line.startswith('#') or len(line) == 0:
541                    continue
542                try:
543                    label, url = line.split(':', 1)
544                    self.tbmap[label] = url
545                except ValueError, e:
546                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
547                            "map db: %s %s" % (lineno, line, e))
548        except IOError, e:
549            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
550                    "open %s: %s" % (file, e))
551        f.close()
552
553    class emulab_segment:
554        def __init__(self, log=None, keyfile=None, debug=False):
555            self.log = log or logging.getLogger(\
556                    'fedd.experiment_control.emulab_segment')
557            self.ssh_privkey_file = keyfile
558            self.debug = debug
559            self.ssh_exec="/usr/bin/ssh"
560            self.scp_exec = "/usr/bin/scp"
561            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
562
563        def scp_file(self, file, user, host, dest=""):
564            """
565            scp a file to the remote host.  If debug is set the action is only
566            logged.
567            """
568
569            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
570                    '-o', 'StrictHostKeyChecking yes', '-i', 
571                    self.ssh_privkey_file, file, 
572                    "%s@%s:%s" % (user, host, dest)]
573            rv = 0
574
575            try:
576                dnull = open("/dev/null", "w")
577            except IOError:
578                self.log.debug("[ssh_file]: failed to open " + \
579                        "/dev/null for redirect")
580                dnull = Null
581
582            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
583            if not self.debug:
584                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
585                        close_fds=True)
586
587            return rv == 0
588
589        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
590            """
591            Run a remote command on host as user.  If debug is set, the action
592            is only logged.  Commands are run without stdin, to avoid stray
593            SIGTTINs.
594            """
595            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
596                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
597                    (self.ssh_exec, self.ssh_privkey_file, 
598                            user, host, cmd)
599
600            try:
601                dnull = open("/dev/null", "w")
602            except IOError:
603                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
604                        "for redirect")
605                dnull = Null
606
607            self.log.debug("[ssh_cmd]: %s" % sh_str)
608            if not self.debug:
609                if dnull:
610                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
611                            close_fds=True)
612                else:
613                    sub = Popen(sh_str, shell=True,
614                            close_fds=True)
615                if timeout:
616                    i = 0
617                    rv = sub.poll()
618                    while i < timeout:
619                        if rv is not None: break
620                        else:
621                            time.sleep(1)
622                            rv = sub.poll()
623                            i += 1
624                    else:
625                        self.log.debug("Process exceeded runtime: %s" % sh_str)
626                        os.kill(sub.pid, signal.SIGKILL)
627                        raise self.ssh_cmd_timeout();
628                    return rv == 0
629                else:
630                    return sub.wait() == 0
631            else:
632                if timeout == 0:
633                    self.log.debug("debug timeout raised on %s " % sh_str)
634                    raise self.ssh_cmd_timeout()
635                else:
636                    return True
637
638    class start_segment(emulab_segment):
639        def __init__(self, log=None, keyfile=None, debug=False):
640            experiment_control_local.emulab_segment.__init__(self,
641                    log=log, keyfile=keyfile, debug=debug)
642
643        def create_config_tree(self, src_dir, dest_dir, script):
644            """
645            Append commands to script that will create the directory hierarchy
646            on the remote federant.
647            """
648
649            if os.path.isdir(src_dir):
650                print >>script, "mkdir -p %s" % dest_dir
651                print >>script, "chmod 770 %s" % dest_dir
652
653                for f in os.listdir(src_dir):
654                    if os.path.isdir(f):
655                        self.create_config_tree("%s/%s" % (src_dir, f), 
656                                "%s/%s" % (dest_dir, f), script)
657            else:
658                self.log.debug("[create_config_tree]: Not a directory: %s" \
659                        % src_dir)
660
661        def ship_configs(self, host, user, src_dir, dest_dir):
662            """
663            Copy federant-specific configuration files to the federant.
664            """
665            for f in os.listdir(src_dir):
666                if os.path.isdir(f):
667                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
668                            "%s/%s" % (dest_dir, f)):
669                        return False
670                else:
671                    if not self.scp_file("%s/%s" % (src_dir, f), 
672                            user, host, dest_dir):
673                        return False
674            return True
675
676        def get_state(self, user, host, tb, pid, eid):
677            # command to test experiment state
678            expinfo_exec = "/usr/testbed/bin/expinfo" 
679            # Regular expressions to parse the expinfo response
680            state_re = re.compile("State:\s+(\w+)")
681            no_exp_re = re.compile("^No\s+such\s+experiment")
682            swapping_re = re.compile("^No\s+information\s+available.")
683            state = None    # Experiment state parsed from expinfo
684            # The expinfo ssh command.  Note the identity restriction to use
685            # only the identity provided in the pubkey given.
686            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
687                    'StrictHostKeyChecking yes', '-i', 
688                    self.ssh_privkey_file, "%s@%s" % (user, host), 
689                    expinfo_exec, pid, eid]
690
691            dev_null = None
692            try:
693                dev_null = open("/dev/null", "a")
694            except IOError, e:
695                self.log.error("[get_state]: can't open /dev/null: %s" %e)
696
697            if self.debug:
698                state = 'swapped'
699                rv = 0
700            else:
701                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
702                        close_fds=True)
703                for line in status.stdout:
704                    m = state_re.match(line)
705                    if m: state = m.group(1)
706                    else:
707                        for reg, st in ((no_exp_re, "none"),
708                                (swapping_re, "swapping")):
709                            m = reg.match(line)
710                            if m: state = st
711                rv = status.wait()
712
713            # If the experiment is not present the subcommand returns a
714            # non-zero return value.  If we successfully parsed a "none"
715            # outcome, ignore the return code.
716            if rv != 0 and state != 'none':
717                raise service_error(service_error.internal,
718                        "Cannot get status of segment %s:%s/%s" % \
719                                (tb, pid, eid))
720            elif state not in ('active', 'swapped', 'swapping', 'none'):
721                raise service_error(service_error.internal,
722                        "Cannot get status of segment %s:%s/%s" % \
723                                (tb, pid, eid))
724            else: return state
725
726
727        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
728            """
729            Start a sub-experiment on a federant.
730
731            Get the current state, modify or create as appropriate, ship data
732            and configs and start the experiment.  There are small ordering
733            differences based on the initial state of the sub-experiment.
734            """
735            # ops node in the federant
736            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
737            user = tbparams[tb]['user']     # federant user
738            pid = tbparams[tb]['project']   # federant project
739            # XXX
740            base_confs = ( "hosts",)
741            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
742            # Configuration directories on the remote machine
743            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
744            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
745            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
746
747            state = self.get_state(user, host, tb, pid, eid)
748
749            self.log.debug("[start_segment]: %s: %s" % (tb, state))
750            self.log.info("[start_segment]:transferring experiment to %s" % tb)
751
752            if not self.scp_file("%s/%s/%s" % \
753                    (tmpdir, tb, tclfile), user, host):
754                return False
755           
756            if state == 'none':
757                # Create a null copy of the experiment so that we capture any
758                # logs there if the modify fails.  Emulab software discards the
759                # logs from a failed startexp
760                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
761                    return False
762                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
763                timedout = False
764                try:
765                    if not self.ssh_cmd(user, host,
766                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
767                            "-e %s null.tcl") % (pid, eid), "startexp",
768                            timeout=60 * 10):
769                        return False
770                except self.ssh_cmd_timeout:
771                    timedout = True
772
773                if timedout:
774                    state = self.get_state(user, host, tb, pid, eid)
775                    if state != "swapped":
776                        return False
777
778           
779            # Open up a temporary file to contain a script for setting up the
780            # filespace for the new experiment.
781            self.log.info("[start_segment]: creating script file")
782            try:
783                sf, scriptname = tempfile.mkstemp()
784                scriptfile = os.fdopen(sf, 'w')
785            except IOError:
786                return False
787
788            scriptbase = os.path.basename(scriptname)
789
790            # Script the filesystem changes
791            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
792            # Clear and create the tarfiles and rpm directories
793            for d in (tarfiles_dir, rpms_dir):
794                print >>scriptfile, "/bin/rm -rf %s/*" % d
795                print >>scriptfile, "mkdir -p %s" % d
796            print >>scriptfile, 'mkdir -p %s' % proj_dir
797            self.create_config_tree("%s/%s" % (tmpdir, tb),
798                    proj_dir, scriptfile)
799            if os.path.isdir("%s/tarfiles" % tmpdir):
800                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
801                        scriptfile)
802            if os.path.isdir("%s/rpms" % tmpdir):
803                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
804                        scriptfile)
805            print >>scriptfile, "rm -f %s" % scriptbase
806            scriptfile.close()
807
808            # Move the script to the remote machine
809            # XXX: could collide tempfile names on the remote host
810            if self.scp_file(scriptname, user, host, scriptbase):
811                os.remove(scriptname)
812            else:
813                return False
814
815            # Execute the script (and the script's last line deletes it)
816            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
817                return False
818
819            for f in base_confs:
820                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
821                        "%s/%s" % (proj_dir, f)):
822                    return False
823            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
824                    proj_dir):
825                return False
826            if os.path.isdir("%s/tarfiles" % tmpdir):
827                if not self.ship_configs(host, user,
828                        "%s/tarfiles" % tmpdir, tarfiles_dir):
829                    return False
830            if os.path.isdir("%s/rpms" % tmpdir):
831                if not self.ship_configs(host, user,
832                        "%s/rpms" % tmpdir, tarfiles_dir):
833                    return False
834            # Stage the new configuration (active experiments will stay swapped
835            # in now)
836            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
837            try:
838                if not self.ssh_cmd(user, host,
839                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
840                                (pid, eid, tclfile),
841                        "modexp", timeout= 60 * 10):
842                    return False
843            except self.ssh_cmd_timeout:
844                self.log.error("Modify command failed to complete in time")
845                # There's really no way to see if this succeeded or failed, so
846                # if it hangs, assume the worst.
847                return False
848            # Active experiments are still swapped, this swaps the others in.
849            if state != 'active':
850                self.log.info("[start_segment]: Swapping %s in on %s" % \
851                        (eid, tb))
852                timedout = False
853                try:
854                    if not self.ssh_cmd(user, host,
855                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
856                            "swapexp", timeout=10*60):
857                        return False
858                except self.ssh_cmd_timeout:
859                    timedout = True
860               
861                # If the command was terminated, but completed successfully,
862                # report success.
863                if timedout:
864                    self.log.debug("[start_segment]: swapin timed out " +\
865                            "checking state")
866                    state = self.get_state(user, host, tb, pid, eid)
867                    self.log.debug("[start_segment]: state is %s" % state)
868                    return state == 'active'
869            # Everything has gone OK.
870            return True
871
872    class stop_segment(emulab_segment):
873        def __init__(self, log=None, keyfile=None, debug=False):
874            experiment_control_local.emulab_segment.__init__(self,
875                    log=log, keyfile=keyfile, debug=debug)
876
877        def __call__(self, tb, eid, tbparams):
878            """
879            Stop a sub experiment by calling swapexp on the federant
880            """
881            user = tbparams[tb]['user']
882            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
883            pid = tbparams[tb]['project']
884
885            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
886            rv = False
887            try:
888                # Clean out tar files: we've gone over quota in the past
889                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
890                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
891                        (pid, eid))
892                rv = self.ssh_cmd(user, host,
893                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
894            except self.ssh_cmd_timeout:
895                rv = False
896            return rv
897
898       
899    def generate_ssh_keys(self, dest, type="rsa" ):
900        """
901        Generate a set of keys for the gateways to use to talk.
902
903        Keys are of type type and are stored in the required dest file.
904        """
905        valid_types = ("rsa", "dsa")
906        t = type.lower();
907        if t not in valid_types: raise ValueError
908        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
909
910        try:
911            trace = open("/dev/null", "w")
912        except IOError:
913            raise service_error(service_error.internal,
914                    "Cannot open /dev/null??");
915
916        # May raise CalledProcessError
917        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
918        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
919        if rv != 0:
920            raise service_error(service_error.internal, 
921                    "Cannot generate nonce ssh keys.  %s return code %d" \
922                            % (self.ssh_keygen, rv))
923
924    def gentopo(self, str):
925        """
926        Generate the topology dtat structure from the splitter's XML
927        representation of it.
928
929        The topology XML looks like:
930            <experiment>
931                <nodes>
932                    <node><vname></vname><ips>ip1:ip2</ips></node>
933                </nodes>
934                <lans>
935                    <lan>
936                        <vname></vname><vnode></vnode><ip></ip>
937                        <bandwidth></bandwidth><member>node:port</member>
938                    </lan>
939                </lans>
940        """
941        class topo_parse:
942            """
943            Parse the topology XML and create the dats structure.
944            """
945            def __init__(self):
946                # Typing of the subelements for data conversion
947                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
948                self.int_subelements = ( 'bandwidth',)
949                self.float_subelements = ( 'delay',)
950                # The final data structure
951                self.nodes = [ ]
952                self.lans =  [ ]
953                self.topo = { \
954                        'node': self.nodes,\
955                        'lan' : self.lans,\
956                    }
957                self.element = { }  # Current element being created
958                self.chars = ""     # Last text seen
959
960            def end_element(self, name):
961                # After each sub element the contents is added to the current
962                # element or to the appropriate list.
963                if name == 'node':
964                    self.nodes.append(self.element)
965                    self.element = { }
966                elif name == 'lan':
967                    self.lans.append(self.element)
968                    self.element = { }
969                elif name in self.str_subelements:
970                    self.element[name] = self.chars
971                    self.chars = ""
972                elif name in self.int_subelements:
973                    self.element[name] = int(self.chars)
974                    self.chars = ""
975                elif name in self.float_subelements:
976                    self.element[name] = float(self.chars)
977                    self.chars = ""
978
979            def found_chars(self, data):
980                self.chars += data.rstrip()
981
982
983        tp = topo_parse();
984        parser = xml.parsers.expat.ParserCreate()
985        parser.EndElementHandler = tp.end_element
986        parser.CharacterDataHandler = tp.found_chars
987
988        parser.Parse(str)
989
990        return tp.topo
991       
992
993    def genviz(self, topo):
994        """
995        Generate the visualization the virtual topology
996        """
997
998        neato = "/usr/local/bin/neato"
999        # These are used to parse neato output and to create the visualization
1000        # file.
1001        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
1002        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
1003                "%s</type></node>"
1004
1005        try:
1006            # Node names
1007            nodes = [ n['vname'] for n in topo['node'] ]
1008            topo_lans = topo['lan']
1009        except KeyError, e:
1010            raise service_error(service_error.internal, "Bad topology: %s" %e)
1011
1012        lans = { }
1013        links = { }
1014
1015        # Walk through the virtual topology, organizing the connections into
1016        # 2-node connections (links) and more-than-2-node connections (lans).
1017        # When a lan is created, it's added to the list of nodes (there's a
1018        # node in the visualization for the lan).
1019        for l in topo_lans:
1020            if links.has_key(l['vname']):
1021                if len(links[l['vname']]) < 2:
1022                    links[l['vname']].append(l['vnode'])
1023                else:
1024                    nodes.append(l['vname'])
1025                    lans[l['vname']] = links[l['vname']]
1026                    del links[l['vname']]
1027                    lans[l['vname']].append(l['vnode'])
1028            elif lans.has_key(l['vname']):
1029                lans[l['vname']].append(l['vnode'])
1030            else:
1031                links[l['vname']] = [ l['vnode'] ]
1032
1033
1034        # Open up a temporary file for dot to turn into a visualization
1035        try:
1036            df, dotname = tempfile.mkstemp()
1037            dotfile = os.fdopen(df, 'w')
1038        except IOError:
1039            raise service_error(service_error.internal,
1040                    "Failed to open file in genviz")
1041
1042        try:
1043            dnull = open('/dev/null', 'w')
1044        except IOError:
1045            service_error(service_error.internal,
1046                    "Failed to open /dev/null in genviz")
1047
1048        # Generate a dot/neato input file from the links, nodes and lans
1049        try:
1050            print >>dotfile, "graph G {"
1051            for n in nodes:
1052                print >>dotfile, '\t"%s"' % n
1053            for l in links.keys():
1054                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
1055            for l in lans.keys():
1056                for n in lans[l]:
1057                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
1058            print >>dotfile, "}"
1059            dotfile.close()
1060        except TypeError:
1061            raise service_error(service_error.internal,
1062                    "Single endpoint link in vtopo")
1063        except IOError:
1064            raise service_error(service_error.internal, "Cannot write dot file")
1065
1066        # Use dot to create a visualization
1067        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
1068                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
1069                close_fds=True)
1070        dnull.close()
1071
1072        # Translate dot to vis format
1073        vis_nodes = [ ]
1074        vis = { 'node': vis_nodes }
1075        for line in dot.stdout:
1076            m = vis_re.match(line)
1077            if m:
1078                vn = m.group(1)
1079                vis_node = {'name': vn, \
1080                        'x': float(m.group(2)),\
1081                        'y' : float(m.group(3)),\
1082                    }
1083                if vn in links.keys() or vn in lans.keys():
1084                    vis_node['type'] = 'lan'
1085                else:
1086                    vis_node['type'] = 'node'
1087                vis_nodes.append(vis_node)
1088        rv = dot.wait()
1089
1090        os.remove(dotname)
1091        if rv == 0 : return vis
1092        else: return None
1093
1094    def get_access(self, tb, nodes, user, tbparam, master, export_project,
1095            access_user):
1096        """
1097        Get access to testbed through fedd and set the parameters for that tb
1098        """
1099        uri = self.tbmap.get(tb, None)
1100        if not uri:
1101            raise service_error(serice_error.server_config, 
1102                    "Unknown testbed: %s" % tb)
1103
1104        # currently this lumps all users into one service access group
1105        service_keys = [ a for u in user \
1106                for a in u.get('access', []) \
1107                    if a.has_key('sshPubkey')]
1108
1109        if len(service_keys) == 0:
1110            raise service_error(service_error.req, 
1111                    "Must have at least one SSH pubkey for services")
1112
1113
1114        for p, u in access_user:
1115            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1116                    "to %s") %  ((p or "None"), u, uri))
1117
1118            if p:
1119                # Request with user and project specified
1120                req = {\
1121                        'destinationTestbed' : { 'uri' : uri },
1122                        'project': { 
1123                            'name': {'localname': p},
1124                            'user': [ {'userID': { 'localname': u } } ],
1125                            },
1126                        'user':  user,
1127                        'allocID' : { 'localname': 'test' },
1128                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1129                        'serviceAccess' : service_keys
1130                    }
1131            else:
1132                # Request with only user specified
1133                req = {\
1134                        'destinationTestbed' : { 'uri' : uri },
1135                        'user':  [ {'userID': { 'localname': u } } ],
1136                        'allocID' : { 'localname': 'test' },
1137                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1138                        'serviceAccess' : service_keys
1139                    }
1140
1141            if tb == master:
1142                # NB, the export_project parameter is a dict that includes
1143                # the type
1144                req['exportProject'] = export_project
1145
1146            # node resources if any
1147            if nodes != None and len(nodes) > 0:
1148                rnodes = [ ]
1149                for n in nodes:
1150                    rn = { }
1151                    image, hw, count = n.split(":")
1152                    if image: rn['image'] = [ image ]
1153                    if hw: rn['hardware'] = [ hw ]
1154                    if count and int(count) >0 : rn['count'] = int(count)
1155                    rnodes.append(rn)
1156                req['resources']= { }
1157                req['resources']['node'] = rnodes
1158
1159            try:
1160                if self.local_access.has_key(uri):
1161                    # Local access call
1162                    req = { 'RequestAccessRequestBody' : req }
1163                    r = self.local_access[uri].RequestAccess(req, 
1164                            fedid(file=self.cert_file))
1165                    r = { 'RequestAccessResponseBody' : r }
1166                else:
1167                    r = self.call_RequestAccess(uri, req, 
1168                            self.cert_file, self.cert_pwd, self.trusted_certs)
1169            except service_error, e:
1170                if e.code == service_error.access:
1171                    self.log.debug("[get_access] Access denied")
1172                    r = None
1173                    continue
1174                else:
1175                    raise e
1176
1177            if r.has_key('RequestAccessResponseBody'):
1178                # Through to here we have a valid response, not a fault.
1179                # Access denied is a fault, so something better or worse than
1180                # access denied has happened.
1181                r = r['RequestAccessResponseBody']
1182                self.log.debug("[get_access] Access granted")
1183                break
1184            else:
1185                raise service_error(service_error.protocol,
1186                        "Bad proxy response")
1187       
1188        if not r:
1189            raise service_error(service_error.access, 
1190                    "Access denied by %s (%s)" % (tb, uri))
1191
1192        e = r['emulab']
1193        p = e['project']
1194        tbparam[tb] = { 
1195                "boss": e['boss'],
1196                "host": e['ops'],
1197                "domain": e['domain'],
1198                "fs": e['fileServer'],
1199                "eventserver": e['eventServer'],
1200                "project": unpack_id(p['name']),
1201                "emulab" : e,
1202                "allocID" : r['allocID'],
1203                }
1204        # Make the testbed name be the label the user applied
1205        p['testbed'] = {'localname': tb }
1206
1207        for u in p['user']:
1208            role = u.get('role', None)
1209            if role == 'experimentCreation':
1210                tbparam[tb]['user'] = unpack_id(u['userID'])
1211                break
1212        else:
1213            raise service_error(service_error.internal, 
1214                    "No createExperimentUser from %s" %tb)
1215
1216        # Add attributes to barameter space.  We don't allow attributes to
1217        # overlay any parameters already installed.
1218        for a in e['fedAttr']:
1219            try:
1220                if a['attribute'] and isinstance(a['attribute'], basestring)\
1221                        and not tbparam[tb].has_key(a['attribute'].lower()):
1222                    tbparam[tb][a['attribute'].lower()] = a['value']
1223            except KeyError:
1224                self.log.error("Bad attribute in response: %s" % a)
1225       
1226    def release_access(self, tb, aid):
1227        """
1228        Release access to testbed through fedd
1229        """
1230
1231        uri = self.tbmap.get(tb, None)
1232        if not uri:
1233            raise service_error(serice_error.server_config, 
1234                    "Unknown testbed: %s" % tb)
1235
1236        if self.local_access.has_key(uri):
1237            resp = self.local_access[uri].ReleaseAccess(\
1238                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1239                    fedid(file=self.cert_file))
1240            resp = { 'ReleaseAccessResponseBody': resp } 
1241        else:
1242            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1243                    self.cert_file, self.cert_pwd, self.trusted_certs)
1244
1245        # better error coding
1246
1247    def remote_splitter(self, uri, desc, master):
1248
1249        req = {
1250                'description' : { 'ns2description': desc },
1251                'master': master,
1252                'include_fedkit': bool(self.fedkit),
1253                'include_gatewaykit': bool(self.gatewaykit)
1254            }
1255
1256        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1257                self.trusted_certs)
1258
1259        if r.has_key('Ns2SplitResponseBody'):
1260            r = r['Ns2SplitResponseBody']
1261            if r.has_key('output'):
1262                return r['output'].splitlines()
1263            else:
1264                raise service_error(service_error.protocol, 
1265                        "Bad splitter response (no output)")
1266        else:
1267            raise service_error(service_error.protocol, "Bad splitter response")
1268       
1269    class current_testbed:
1270        """
1271        Object for collecting the current testbed description.  The testbed
1272        description is saved to a file with the local testbed variables
1273        subsittuted line by line.
1274        """
1275        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1276            def tar_list_to_string(tl):
1277                if tl is None: return None
1278
1279                rv = ""
1280                for t in tl:
1281                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1282                            (t[0], os.path.basename(t[1]))
1283                return rv
1284
1285
1286            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1287            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1288            self.current_testbed = None
1289            self.testbed_file = None
1290
1291            self.def_expstart = \
1292                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1293            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1294            self.def_gwstart = \
1295                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1296            self.def_mgwstart = \
1297                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1298            self.def_gwimage = "FBSD61-TUNNEL2";
1299            self.def_gwtype = "pc";
1300            self.def_mgwcmd = '# '
1301            self.def_mgwcmdparams = ''
1302            self.def_gwcmd = '# '
1303            self.def_gwcmdparams = ''
1304
1305            self.eid = eid
1306            self.tmpdir = tmpdir
1307            # Convert fedkit and gateway kit (which are lists of tuples) into a
1308            # substituition string.
1309            self.fedkit = tar_list_to_string(fedkit)
1310            self.gatewaykit = tar_list_to_string(gatewaykit)
1311
1312        def __call__(self, line, master, allocated, tbparams):
1313            # Capture testbed topology descriptions
1314            if self.current_testbed == None:
1315                m = self.begin_testbed.match(line)
1316                if m != None:
1317                    self.current_testbed = m.group(1)
1318                    if self.current_testbed == None:
1319                        raise service_error(service_error.req,
1320                                "Bad request format (unnamed testbed)")
1321                    allocated[self.current_testbed] = \
1322                            allocated.get(self.current_testbed,0) + 1
1323                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1324                    if not os.path.exists(tb_dir):
1325                        try:
1326                            os.mkdir(tb_dir)
1327                        except IOError:
1328                            raise service_error(service_error.internal,
1329                                    "Cannot create %s" % tb_dir)
1330                    try:
1331                        self.testbed_file = open("%s/%s.%s.tcl" %
1332                                (tb_dir, self.eid, self.current_testbed), 'w')
1333                    except IOError:
1334                        self.testbed_file = None
1335                    return True
1336                else: return False
1337            else:
1338                m = self.end_testbed.match(line)
1339                if m != None:
1340                    if m.group(1) != self.current_testbed:
1341                        raise service_error(service_error.internal, 
1342                                "Mismatched testbed markers!?")
1343                    if self.testbed_file != None: 
1344                        self.testbed_file.close()
1345                        self.testbed_file = None
1346                    self.current_testbed = None
1347                elif self.testbed_file:
1348                    # Substitute variables and put the line into the local
1349                    # testbed file.
1350                    gwtype = tbparams[self.current_testbed].get(\
1351                            'connectortype', self.def_gwtype)
1352                    gwimage = tbparams[self.current_testbed].get(\
1353                            'connectorimage', self.def_gwimage)
1354                    mgwstart = tbparams[self.current_testbed].get(\
1355                            'masterconnectorstartcmd', self.def_mgwstart)
1356                    mexpstart = tbparams[self.current_testbed].get(\
1357                            'masternodestartcmd', self.def_mexpstart)
1358                    gwstart = tbparams[self.current_testbed].get(\
1359                            'slaveconnectorstartcmd', self.def_gwstart)
1360                    expstart = tbparams[self.current_testbed].get(\
1361                            'slavenodestartcmd', self.def_expstart)
1362                    project = tbparams[self.current_testbed].get('project')
1363                    gwcmd = tbparams[self.current_testbed].get(\
1364                            'slaveconnectorcmd', self.def_gwcmd)
1365                    gwcmdparams = tbparams[self.current_testbed].get(\
1366                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1367                    mgwcmd = tbparams[self.current_testbed].get(\
1368                            'masterconnectorcmd', self.def_gwcmd)
1369                    mgwcmdparams = tbparams[self.current_testbed].get(\
1370                            'masterconnectorcmdparams', self.def_gwcmdparams)
1371                    line = re.sub("GWTYPE", gwtype, line)
1372                    line = re.sub("GWIMAGE", gwimage, line)
1373                    if self.current_testbed == master:
1374                        line = re.sub("GWSTART", mgwstart, line)
1375                        line = re.sub("EXPSTART", mexpstart, line)
1376                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1377                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1378                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1379                    else:
1380                        line = re.sub("GWSTART", gwstart, line)
1381                        line = re.sub("EXPSTART", expstart, line)
1382                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1383                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1384                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1385                    #These expansions contain EID and PROJDIR.  NB these are
1386                    # local fedkit and gatewaykit, which are strings.
1387                    if self.fedkit:
1388                        line = re.sub("FEDKIT", self.fedkit, line)
1389                    if self.gatewaykit:
1390                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1391                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1392                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1393                    line = re.sub("EID", self.eid, line)
1394                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1395                            (project, self.eid), line)
1396                    print >>self.testbed_file, line
1397                return True
1398
1399    class allbeds:
1400        """
1401        Process the Allbeds section.  Get access to each federant and save the
1402        parameters in tbparams
1403        """
1404        def __init__(self, get_access):
1405            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1406            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1407            self.in_allbeds = False
1408            self.get_access = get_access
1409
1410        def __call__(self, line, user, tbparams, master, export_project,
1411                access_user):
1412            # Testbed access parameters
1413            if not self.in_allbeds:
1414                if self.begin_allbeds.match(line):
1415                    self.in_allbeds = True
1416                    return True
1417                else:
1418                    return False
1419            else:
1420                if self.end_allbeds.match(line):
1421                    self.in_allbeds = False
1422                else:
1423                    nodes = line.split('|')
1424                    tb = nodes.pop(0)
1425                    self.get_access(tb, nodes, user, tbparams, master,
1426                            export_project, access_user)
1427                return True
1428
1429    class gateways:
1430        def __init__(self, eid, master, tmpdir, gw_pubkey,
1431                gw_secretkey, copy_file, fedkit):
1432            self.begin_gateways = \
1433                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1434            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1435            self.current_gateways = None
1436            self.control_gateway = None
1437            self.active_end = { }
1438
1439            self.eid = eid
1440            self.master = master
1441            self.tmpdir = tmpdir
1442            self.gw_pubkey_base = gw_pubkey
1443            self.gw_secretkey_base = gw_secretkey
1444
1445            self.copy_file = copy_file
1446            self.fedkit = fedkit
1447
1448
1449        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1450                active_end, tbparams, dtb, myname, desthost, type):
1451            """
1452            Produce a gateway configuration file from a gateways line.
1453            """
1454
1455            sproject = tbparams[gw].get('project', 'project')
1456            dproject = tbparams[dtb].get('project', 'project')
1457            sdomain = ".%s.%s%s" % (eid, sproject,
1458                    tbparams[gw].get('domain', ".example.com"))
1459            ddomain = ".%s.%s%s" % (eid, dproject,
1460                    tbparams[dtb].get('domain', ".example.com"))
1461            boss = tbparams[master].get('boss', "boss")
1462            fs = tbparams[master].get('fs', "fs")
1463            event_server = "%s%s" % \
1464                    (tbparams[gw].get('eventserver', "event_server"),
1465                            tbparams[gw].get('domain', "example.com"))
1466            remote_event_server = "%s%s" % \
1467                    (tbparams[dtb].get('eventserver', "event_server"),
1468                            tbparams[dtb].get('domain', "example.com"))
1469            seer_control = "%s%s" % \
1470                    (tbparams[gw].get('control', "control"), sdomain)
1471            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1472
1473            if self.fedkit:
1474                remote_script_dir = "/usr/local/federation/bin"
1475                local_script_dir = "/usr/local/federation/bin"
1476            else:
1477                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1478                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1479
1480            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1481            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1482            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1483
1484            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1485            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1486
1487            # translate to lower case so the `hostname` hack for specifying
1488            # configuration files works.
1489            conf_file = conf_file.lower();
1490            remote_conf_file = remote_conf_file.lower();
1491
1492            if dtb == master:
1493                active = "false"
1494            elif gw == master:
1495                active = "true"
1496            elif active_end.has_key('%s-%s' % (dtb, gw)):
1497                active = "false"
1498            else:
1499                active_end['%s-%s' % (gw, dtb)] = 1
1500                active = "true"
1501
1502            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1503            print >>gwconfig, "Active: %s" % active
1504            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1505            if tunnel_iface:
1506                print >>gwconfig, "Interface: %s" % tunnel_iface
1507            print >>gwconfig, "BossName: %s" % boss
1508            print >>gwconfig, "FsName: %s" % fs
1509            print >>gwconfig, "EventServerName: %s" % event_server
1510            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1511            print >>gwconfig, "SeerControl: %s" % seer_control
1512            print >>gwconfig, "Type: %s" % type
1513            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1514            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1515                    local_script_dir
1516            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1517            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1518            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1519                    (remote_conf_dir, remote_conf_file)
1520            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1521            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1522            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1523            gwconfig.close()
1524
1525            return active == "true"
1526
1527        def __call__(self, line, allocated, tbparams):
1528            # Process gateways
1529            if not self.current_gateways:
1530                m = self.begin_gateways.match(line)
1531                if m:
1532                    self.current_gateways = m.group(1)
1533                    if allocated.has_key(self.current_gateways):
1534                        # This test should always succeed
1535                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1536                        if not os.path.exists(tb_dir):
1537                            try:
1538                                os.mkdir(tb_dir)
1539                            except IOError:
1540                                raise service_error(service_error.internal,
1541                                        "Cannot create %s" % tb_dir)
1542                    else:
1543                        # XXX
1544                        self.log.error("[gateways]: Ignoring gateways for " + \
1545                                "unknown testbed %s" % self.current_gateways)
1546                        self.current_gateways = None
1547                    return True
1548                else:
1549                    return False
1550            else:
1551                m = self.end_gateways.match(line)
1552                if m :
1553                    if m.group(1) != self.current_gateways:
1554                        raise service_error(service_error.internal,
1555                                "Mismatched gateway markers!?")
1556                    if self.control_gateway:
1557                        try:
1558                            cc = open("%s/%s/client.conf" %
1559                                    (self.tmpdir, self.current_gateways), 'w')
1560                            print >>cc, "ControlGateway: %s" % \
1561                                    self.control_gateway
1562                            if tbparams[self.master].has_key('smbshare'):
1563                                print >>cc, "SMBSHare: %s" % \
1564                                        tbparams[self.master]['smbshare']
1565                            print >>cc, "ProjectUser: %s" % \
1566                                    tbparams[self.master]['user']
1567                            print >>cc, "ProjectName: %s" % \
1568                                    tbparams[self.master]['project']
1569                            print >>cc, "ExperimentID: %s/%s" % \
1570                                    ( tbparams[self.master]['project'], \
1571                                    self.eid )
1572                            cc.close()
1573                        except IOError:
1574                            raise service_error(service_error.internal,
1575                                    "Error creating client config")
1576                        # XXX: This seer specific file should disappear
1577                        try:
1578                            cc = open("%s/%s/seer.conf" %
1579                                    (self.tmpdir, self.current_gateways),
1580                                    'w')
1581                            if self.current_gateways != self.master:
1582                                print >>cc, "ControlNode: %s" % \
1583                                        self.control_gateway
1584                            print >>cc, "ExperimentID: %s/%s" % \
1585                                    ( tbparams[self.master]['project'], \
1586                                    self.eid )
1587                            cc.close()
1588                        except IOError:
1589                            raise service_error(service_error.internal,
1590                                    "Error creating seer config")
1591                    else:
1592                        debug.error("[gateways]: No control gateway for %s" %\
1593                                    self.current_gateways)
1594                    self.current_gateways = None
1595                else:
1596                    dtb, myname, desthost, type = line.split(" ")
1597
1598                    if type == "control" or type == "both":
1599                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1600                                self.eid, 
1601                                tbparams[self.current_gateways]['project'],
1602                                tbparams[self.current_gateways]['domain'])
1603                    try:
1604                        active = self.gateway_conf_file(self.current_gateways,
1605                                self.master, self.eid, self.gw_pubkey_base,
1606                                self.gw_secretkey_base,
1607                                self.active_end, tbparams, dtb, myname,
1608                                desthost, type)
1609                    except IOError, e:
1610                        raise service_error(service_error.internal,
1611                                "Failed to write config file for %s" % \
1612                                        self.current_gateway)
1613           
1614                    gw_pubkey = "%s/keys/%s" % \
1615                            (self.tmpdir, self.gw_pubkey_base)
1616                    gw_secretkey = "%s/keys/%s" % \
1617                            (self.tmpdir, self.gw_secretkey_base)
1618
1619                    pkfile = "%s/%s/%s" % \
1620                            ( self.tmpdir, self.current_gateways, 
1621                                    self.gw_pubkey_base)
1622                    skfile = "%s/%s/%s" % \
1623                            ( self.tmpdir, self.current_gateways, 
1624                                    self.gw_secretkey_base)
1625
1626                    if not os.path.exists(pkfile):
1627                        try:
1628                            self.copy_file(gw_pubkey, pkfile)
1629                        except IOError:
1630                            service_error(service_error.internal,
1631                                    "Failed to copy pubkey file")
1632
1633                    if active and not os.path.exists(skfile):
1634                        try:
1635                            self.copy_file(gw_secretkey, skfile)
1636                        except IOError:
1637                            service_error(service_error.internal,
1638                                    "Failed to copy secretkey file")
1639                return True
1640
1641    class shunt_to_file:
1642        """
1643        Simple class to write data between two regexps to a file.
1644        """
1645        def __init__(self, begin, end, filename):
1646            """
1647            Begin shunting on a match of begin, stop on end, send data to
1648            filename.
1649            """
1650            self.begin = re.compile(begin)
1651            self.end = re.compile(end)
1652            self.in_shunt = False
1653            self.file = None
1654            self.filename = filename
1655
1656        def __call__(self, line):
1657            """
1658            Call this on each line in the input that may be shunted.
1659            """
1660            if not self.in_shunt:
1661                if self.begin.match(line):
1662                    self.in_shunt = True
1663                    try:
1664                        self.file = open(self.filename, "w")
1665                    except:
1666                        self.file = None
1667                        raise
1668                    return True
1669                else:
1670                    return False
1671            else:
1672                if self.end.match(line):
1673                    if self.file: 
1674                        self.file.close()
1675                        self.file = None
1676                    self.in_shunt = False
1677                else:
1678                    if self.file:
1679                        print >>self.file, line
1680                return True
1681
1682    class shunt_to_list:
1683        """
1684        Same interface as shunt_to_file.  Data collected in self.list, one list
1685        element per line.
1686        """
1687        def __init__(self, begin, end):
1688            self.begin = re.compile(begin)
1689            self.end = re.compile(end)
1690            self.in_shunt = False
1691            self.list = [ ]
1692       
1693        def __call__(self, line):
1694            if not self.in_shunt:
1695                if self.begin.match(line):
1696                    self.in_shunt = True
1697                    return True
1698                else:
1699                    return False
1700            else:
1701                if self.end.match(line):
1702                    self.in_shunt = False
1703                else:
1704                    self.list.append(line)
1705                return True
1706
1707    class shunt_to_string:
1708        """
1709        Same interface as shunt_to_file.  Data collected in self.str, all in
1710        one string.
1711        """
1712        def __init__(self, begin, end):
1713            self.begin = re.compile(begin)
1714            self.end = re.compile(end)
1715            self.in_shunt = False
1716            self.str = ""
1717       
1718        def __call__(self, line):
1719            if not self.in_shunt:
1720                if self.begin.match(line):
1721                    self.in_shunt = True
1722                    return True
1723                else:
1724                    return False
1725            else:
1726                if self.end.match(line):
1727                    self.in_shunt = False
1728                else:
1729                    self.str += line
1730                return True
1731
1732    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1733            tbparams, tmpdir, alloc_log=None):
1734        started = { }           # Testbeds where a sub-experiment started
1735                                # successfully
1736
1737        # XXX
1738        fail_soft = False
1739
1740        log = alloc_log or self.log
1741
1742        thread_pool = self.thread_pool(self.nthreads)
1743        threads = [ ]
1744
1745        for tb in [ k for k in allocated.keys() if k != master]:
1746            # Create and start a thread to start the segment, and save it to
1747            # get the return value later
1748            thread_pool.wait_for_slot()
1749            t  = self.pooled_thread(\
1750                    target=self.start_segment(log=log,
1751                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1752                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1753                    pdata=thread_pool, trace_file=self.trace_file)
1754            threads.append(t)
1755            t.start()
1756
1757        # Wait until all finish
1758        thread_pool.wait_for_all_done()
1759
1760        # If none failed, start the master
1761        failed = [ t.getName() for t in threads if not t.rv ]
1762
1763        if len(failed) == 0:
1764            starter = self.start_segment(log=log, 
1765                    keyfile=self.ssh_privkey_file, debug=self.debug)
1766            if not starter(master, eid, tbparams, tmpdir):
1767                failed.append(master)
1768
1769        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1770        # If one failed clean up, unless fail_soft is set
1771        if failed:
1772            if not fail_soft:
1773                thread_pool.clear()
1774                for tb in succeeded:
1775                    # Create and start a thread to stop the segment
1776                    thread_pool.wait_for_slot()
1777                    t  = self.pooled_thread(\
1778                            target=self.stop_segment(log=log,
1779                                keyfile=self.ssh_privkey_file,
1780                                debug=self.debug), 
1781                            args=(tb, eid, tbparams), name=tb,
1782                            pdata=thread_pool, trace_file=self.trace_file)
1783                    t.start()
1784                # Wait until all finish
1785                thread_pool.wait_for_all_done()
1786
1787                # release the allocations
1788                for tb in tbparams.keys():
1789                    self.release_access(tb, tbparams[tb]['allocID'])
1790                # Remove the placeholder
1791                self.state_lock.acquire()
1792                self.state[eid]['experimentStatus'] = 'failed'
1793                if self.state_filename: self.write_state()
1794                self.state_lock.release()
1795
1796                #raise service_error(service_error.federant,
1797                #    "Swap in failed on %s" % ",".join(failed))
1798                log.error("Swap in failed on %s" % ",".join(failed))
1799                return
1800        else:
1801            log.info("[start_segment]: Experiment %s active" % eid)
1802
1803        log.debug("[start_experiment]: removing %s" % tmpdir)
1804
1805        # Walk up tmpdir, deleting as we go
1806        for path, dirs, files in os.walk(tmpdir, topdown=False):
1807            for f in files:
1808                os.remove(os.path.join(path, f))
1809            for d in dirs:
1810                os.rmdir(os.path.join(path, d))
1811        os.rmdir(tmpdir)
1812
1813        # Insert the experiment into our state and update the disk copy
1814        self.state_lock.acquire()
1815        self.state[expid]['experimentStatus'] = 'active'
1816        self.state[eid] = self.state[expid]
1817        if self.state_filename: self.write_state()
1818        self.state_lock.release()
1819        return
1820
1821    def create_experiment(self, req, fid):
1822        """
1823        The external interface to experiment creation called from the
1824        dispatcher.
1825
1826        Creates a working directory, splits the incoming description using the
1827        splitter script and parses out the avrious subsections using the
1828        lcasses above.  Once each sub-experiment is created, use pooled threads
1829        to instantiate them and start it all up.
1830        """
1831
1832        if not self.auth.check_attribute(fid, 'create'):
1833            raise service_error(service_error.access, "Create access denied")
1834
1835        try:
1836            tmpdir = tempfile.mkdtemp(prefix="split-")
1837        except IOError:
1838            raise service_error(service_error.internal, "Cannot create tmp dir")
1839
1840        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1841        gw_secretkey_base = "fed.%s" % self.ssh_type
1842        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1843        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1844        tclfile = tmpdir + "/experiment.tcl"
1845        tbparams = { }
1846        try:
1847            access_user = self.accessdb[fid]
1848        except KeyError:
1849            raise service_error(service_error.internal,
1850                    "Access map and authorizer out of sync in " + \
1851                            "create_experiment for fedid %s"  % fid)
1852
1853        pid = "dummy"
1854        gid = "dummy"
1855        try:
1856            os.mkdir(tmpdir+"/keys")
1857        except OSError:
1858            raise service_error(service_error.internal,
1859                    "Can't make temporary dir")
1860
1861        req = req.get('CreateRequestBody', None)
1862        if not req:
1863            raise service_error(service_error.req,
1864                    "Bad request format (no CreateRequestBody)")
1865        # The tcl parser needs to read a file so put the content into that file
1866        descr=req.get('experimentdescription', None)
1867        if descr:
1868            file_content=descr.get('ns2description', None)
1869            if file_content:
1870                try:
1871                    f = open(tclfile, 'w')
1872                    f.write(file_content)
1873                    f.close()
1874                except IOError:
1875                    raise service_error(service_error.internal,
1876                            "Cannot write temp experiment description")
1877            else:
1878                raise service_error(service_error.req, 
1879                        "Only ns2descriptions supported")
1880        else:
1881            raise service_error(service_error.req, "No experiment description")
1882
1883        # Generate an ID for the experiment (slice) and a certificate that the
1884        # allocator can use to prove they own it.  We'll ship it back through
1885        # the encrypted connection.
1886        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1887
1888        if req.has_key('experimentID') and \
1889                req['experimentID'].has_key('localname'):
1890            overwrite = False
1891            eid = req['experimentID']['localname']
1892            # If there's an old failed experiment here with the same local name
1893            # and accessible by this user, we'll overwrite it, otherwise we'll
1894            # fall through and do the collision avoidance.
1895            old_expid = self.get_experiment_fedid(eid)
1896            if old_expid and self.check_experiment_access(fid, old_expid):
1897                self.state_lock.acquire()
1898                status = self.state[eid].get('experimentStatus', None)
1899                if status and status == 'failed':
1900                    # remove the old access attribute
1901                    self.auth.unset_attribute(fid, old_expid)
1902                    overwrite = True
1903                    del self.state[eid]
1904                    del self.state[old_expid]
1905                self.state_lock.release()
1906            self.state_lock.acquire()
1907            while (self.state.has_key(eid) and not overwrite):
1908                eid += random.choice(string.ascii_letters)
1909            # Initial state
1910            self.state[eid] = {
1911                    'experimentID' : \
1912                            [ { 'localname' : eid }, {'fedid': expid } ],
1913                    'experimentStatus': 'starting',
1914                    'experimentAccess': { 'X509' : expcert },
1915                    'owner': fid,
1916                    'log' : [],
1917                }
1918            self.state[expid] = self.state[eid]
1919            if self.state_filename: self.write_state()
1920            self.state_lock.release()
1921        else:
1922            eid = self.exp_stem
1923            for i in range(0,5):
1924                eid += random.choice(string.ascii_letters)
1925            self.state_lock.acquire()
1926            while (self.state.has_key(eid)):
1927                eid = self.exp_stem
1928                for i in range(0,5):
1929                    eid += random.choice(string.ascii_letters)
1930            # Initial state
1931            self.state[eid] = {
1932                    'experimentID' : \
1933                            [ { 'localname' : eid }, {'fedid': expid } ],
1934                    'experimentStatus': 'starting',
1935                    'experimentAccess': { 'X509' : expcert },
1936                    'owner': fid,
1937                    'log' : [],
1938                }
1939            self.state[expid] = self.state[eid]
1940            if self.state_filename: self.write_state()
1941            self.state_lock.release()
1942
1943        try: 
1944            # This catches exceptions to clear the placeholder if necessary
1945            try:
1946                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1947            except ValueError:
1948                raise service_error(service_error.server_config, 
1949                        "Bad key type (%s)" % self.ssh_type)
1950
1951            user = req.get('user', None)
1952            if user == None:
1953                raise service_error(service_error.req, "No user")
1954
1955            master = req.get('master', None)
1956            if not master:
1957                raise service_error(service_error.req,
1958                        "No master testbed label")
1959            export_project = req.get('exportProject', None)
1960            if not export_project:
1961                raise service_error(service_error.req, "No export project")
1962           
1963            if self.splitter_url:
1964                self.log.debug("Calling remote splitter at %s" % \
1965                        self.splitter_url)
1966                split_data = self.remote_splitter(self.splitter_url,
1967                        file_content, master)
1968            else:
1969                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1970                    str(self.muxmax), '-m', master]
1971
1972                if self.fedkit:
1973                    tclcmd.append('-k')
1974
1975                if self.gatewaykit:
1976                    tclcmd.append('-K')
1977
1978                tclcmd.extend([pid, gid, eid, tclfile])
1979
1980                self.log.debug("running local splitter %s", " ".join(tclcmd))
1981                # This is just fantastic.  As a side effect the parser copies
1982                # tb_compat.tcl into the current directory, so that directory
1983                # must be writable by the fedd user.  Doing this in the
1984                # temporary subdir ensures this is the case.
1985                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1986                        cwd=tmpdir)
1987                split_data = tclparser.stdout
1988
1989            allocated = { }         # Testbeds we can access
1990            # Objects to parse the splitter output (defined above)
1991            parse_current_testbed = self.current_testbed(eid, tmpdir,
1992                    self.fedkit, self.gatewaykit)
1993            parse_allbeds = self.allbeds(self.get_access)
1994            parse_gateways = self.gateways(eid, master, tmpdir,
1995                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1996                    self.fedkit)
1997            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1998                        "^#\s+End\s+Vtopo")
1999            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
2000                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
2001            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
2002                    "^#\s+End\s+tarfiles")
2003            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
2004                    "^#\s+End\s+rpms")
2005
2006            # Working on the split data
2007            for line in split_data:
2008                line = line.rstrip()
2009                if parse_current_testbed(line, master, allocated, tbparams):
2010                    continue
2011                elif parse_allbeds(line, user, tbparams, master, export_project,
2012                        access_user):
2013                    continue
2014                elif parse_gateways(line, allocated, tbparams):
2015                    continue
2016                elif parse_vtopo(line):
2017                    continue
2018                elif parse_hostnames(line):
2019                    continue
2020                elif parse_tarfiles(line):
2021                    continue
2022                elif parse_rpms(line):
2023                    continue
2024                else:
2025                    raise service_error(service_error.internal, 
2026                            "Bad tcl parse? %s" % line)
2027            # Virtual topology and visualization
2028            vtopo = self.gentopo(parse_vtopo.str)
2029            if not vtopo:
2030                raise service_error(service_error.internal, 
2031                        "Failed to generate virtual topology")
2032
2033            vis = self.genviz(vtopo)
2034            if not vis:
2035                raise service_error(service_error.internal, 
2036                        "Failed to generate visualization")
2037
2038           
2039            # save federant information
2040            for k in allocated.keys():
2041                tbparams[k]['federant'] = {\
2042                        'name': [ { 'localname' : eid} ],\
2043                        'emulab': tbparams[k]['emulab'],\
2044                        'allocID' : tbparams[k]['allocID'],\
2045                        'master' : k == master,\
2046                    }
2047
2048            self.state_lock.acquire()
2049            self.state[eid]['vtopo'] = vtopo
2050            self.state[eid]['vis'] = vis
2051            self.state[expid]['federant'] = \
2052                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2053                        if tbparams[tb].has_key('federant') ]
2054            if self.state_filename: self.write_state()
2055            self.state_lock.release()
2056
2057            # Copy tarfiles and rpms needed at remote sites into a staging area
2058            try:
2059                if self.fedkit:
2060                    for t in self.fedkit:
2061                        parse_tarfiles.list.append(t[1])
2062                if self.gatewaykit:
2063                    for t in self.gatewaykit:
2064                        parse_tarfiles.list.append(t[1])
2065                for t in parse_tarfiles.list:
2066                    if not os.path.exists("%s/tarfiles" % tmpdir):
2067                        os.mkdir("%s/tarfiles" % tmpdir)
2068                    self.copy_file(t, "%s/tarfiles/%s" % \
2069                            (tmpdir, os.path.basename(t)))
2070                for r in parse_rpms.list:
2071                    if not os.path.exists("%s/rpms" % tmpdir):
2072                        os.mkdir("%s/rpms" % tmpdir)
2073                    self.copy_file(r, "%s/rpms/%s" % \
2074                            (tmpdir, os.path.basename(r)))
2075                # A null experiment file in case we need to create a remote
2076                # experiment from scratch
2077                f = open("%s/null.tcl" % tmpdir, "w")
2078                print >>f, """
2079set ns [new Simulator]
2080source tb_compat.tcl
2081
2082set a [$ns node]
2083
2084$ns rtproto Session
2085$ns run
2086"""
2087                f.close()
2088
2089            except IOError, e:
2090                raise service_error(service_error.internal, 
2091                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2092
2093        except service_error, e:
2094            # If something goes wrong in the parse (usually an access error)
2095            # clear the placeholder state.  From here on out the code delays
2096            # exceptions.  Failing at this point returns a fault to the remote
2097            # caller.
2098            self.state_lock.acquire()
2099            del self.state[eid]
2100            del self.state[expid]
2101            if self.state_filename: self.write_state()
2102            self.state_lock.release()
2103            raise e
2104
2105
2106        # Start the background swapper and return the starting state.  From
2107        # here on out, the state will stick around a while.
2108
2109        # Let users touch the state
2110        self.auth.set_attribute(fid, expid)
2111        self.auth.set_attribute(expid, expid)
2112        # Override fedids can manipulate state as well
2113        for o in self.overrides:
2114            self.auth.set_attribute(o, expid)
2115
2116        # Create a logger that logs to the experiment's state object as well as
2117        # to the main log file.
2118        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2119        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2120        # XXX: there should be a global one of these rather than repeating the
2121        # code.
2122        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2123                    '%d %b %y %H:%M:%S'))
2124        alloc_log.addHandler(h)
2125       
2126        # Start a thread to do the resource allocation
2127        t  = Thread(target=self.allocate_resources,
2128                args=(allocated, master, eid, expid, expcert, tbparams, 
2129                    tmpdir, alloc_log),
2130                name=eid)
2131        t.start()
2132
2133        rv = {
2134                'experimentID': [
2135                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2136                ],
2137                'experimentStatus': 'starting',
2138                'experimentAccess': { 'X509' : expcert }
2139            }
2140
2141        return rv
2142
2143    class new_start_segment:
2144        def __init__(self, debug=False, log=None, cert_file=None,
2145                cert_pwd=None, trusted_certs=None, caller=None):
2146            self.log = log
2147            self.debug = debug
2148            self.cert_file = cert_file
2149            self.cert_pwd = cert_pwd
2150            self.trusted_certs = None
2151            self.caller = caller
2152
2153        def __call__(self, uri, aid, topo, master, attrs=None):
2154            req = {
2155                    'allocID': { 'fedid' : aid }, 
2156                    'segmentdescription': { 
2157                        'topdldescription': topo.to_dict(),
2158                    },
2159                    'master': master,
2160                }
2161            if attrs:
2162                req['fedAttr'] = attrs
2163
2164            print req
2165            r = self.caller(uri, req, self.cert_file, self.cert_pwd,
2166                    self.trusted_certs)
2167            print r
2168            return True
2169
2170
2171   
2172
2173    def new_allocate_resources(self, allocated, master, eid, expid, expcert, 
2174            tbparams, topo, tmpdir, alloc_log=None, attrs=None):
2175        started = { }           # Testbeds where a sub-experiment started
2176                                # successfully
2177
2178        # XXX
2179        fail_soft = False
2180
2181        log = alloc_log or self.log
2182
2183        thread_pool = self.thread_pool(self.nthreads)
2184        threads = [ ]
2185
2186        for tb in [ k for k in allocated.keys() if k != master]:
2187            # Create and start a thread to start the segment, and save it to
2188            # get the return value later
2189            thread_pool.wait_for_slot()
2190            uri = self.tbmap.get(tb, None)
2191            if not uri:
2192                raise service_error(service_error.internal, 
2193                        "Unknown testbed %s !?" % tb)
2194
2195            if tbparams[tb].has_key('allocID') and \
2196                    tbparams[tb]['allocID'].has_key('fedid'):
2197                aid = tbparams[tb]['allocID']['fedid']
2198            else:
2199                raise service_error(service_error.internal, 
2200                        "No alloc id for testbed %s !?" % tb)
2201
2202            t  = self.pooled_thread(\
2203                    target=self.new_start_segment(log=log, debug=self.debug,
2204                        cert_file=self.cert_file, cert_pwd=self.cert_pwd,
2205                        trusted_certs=self.trusted_certs,
2206                        caller=self.call_StartSegment), 
2207                    args=(uri, aid, topo[tb], False, attrs), name=tb,
2208                    pdata=thread_pool, trace_file=self.trace_file)
2209            threads.append(t)
2210            t.start()
2211
2212        # Wait until all finish
2213        thread_pool.wait_for_all_done()
2214
2215        # If none failed, start the master
2216        failed = [ t.getName() for t in threads if not t.rv ]
2217
2218        if len(failed) == 0:
2219            uri = self.tbmap.get(master, None)
2220            if not uri:
2221                raise service_error(service_error.internal, 
2222                        "Unknown testbed %s !?" % master)
2223
2224            if tbparams[master].has_key('allocID') and \
2225                    tbparams[master]['allocID'].has_key('fedid'):
2226                aid = tbparams[master]['allocID']['fedid']
2227            else:
2228                raise service_error(service_error.internal, 
2229                    "No alloc id for testbed %s !?" % master)
2230            starter = self.new_start_segment(log=log, debug=self.debug,
2231                    cert_file=self.cert_file, cert_pwd=self.cert_pwd,
2232                    trusted_certs=self.trusted_certs,
2233                    caller=self.call_StartSegment)
2234            if not starter(uri, aid, topo[master], True, attrs):
2235                failed.append(master)
2236
2237        succeeded = [tb for tb in allocated.keys() if tb not in failed]
2238        # If one failed clean up, unless fail_soft is set
2239        if failed and False:
2240            if not fail_soft:
2241                thread_pool.clear()
2242                for tb in succeeded:
2243                    # Create and start a thread to stop the segment
2244                    thread_pool.wait_for_slot()
2245                    t  = self.pooled_thread(\
2246                            target=self.stop_segment(log=log,
2247                                keyfile=self.ssh_privkey_file,
2248                                debug=self.debug), 
2249                            args=(tb, eid, tbparams), name=tb,
2250                            pdata=thread_pool, trace_file=self.trace_file)
2251                    t.start()
2252                # Wait until all finish
2253                thread_pool.wait_for_all_done()
2254
2255                # release the allocations
2256                for tb in tbparams.keys():
2257                    self.release_access(tb, tbparams[tb]['allocID'])
2258                # Remove the placeholder
2259                self.state_lock.acquire()
2260                self.state[eid]['experimentStatus'] = 'failed'
2261                if self.state_filename: self.write_state()
2262                self.state_lock.release()
2263
2264                log.error("Swap in failed on %s" % ",".join(failed))
2265                return
2266        else:
2267            log.info("[start_segment]: Experiment %s active" % eid)
2268
2269        log.debug("[start_experiment]: removing %s" % tmpdir)
2270
2271        # Walk up tmpdir, deleting as we go
2272        for path, dirs, files in os.walk(tmpdir, topdown=False):
2273            for f in files:
2274                os.remove(os.path.join(path, f))
2275            for d in dirs:
2276                os.rmdir(os.path.join(path, d))
2277        os.rmdir(tmpdir)
2278
2279        # Insert the experiment into our state and update the disk copy
2280        self.state_lock.acquire()
2281        self.state[expid]['experimentStatus'] = 'active'
2282        self.state[eid] = self.state[expid]
2283        if self.state_filename: self.write_state()
2284        self.state_lock.release()
2285        return
2286
2287
2288    def new_create_experiment(self, req, fid):
2289        """
2290        The external interface to experiment creation called from the
2291        dispatcher.
2292
2293        Creates a working directory, splits the incoming description using the
2294        splitter script and parses out the avrious subsections using the
2295        lcasses above.  Once each sub-experiment is created, use pooled threads
2296        to instantiate them and start it all up.
2297        """
2298
2299        def add_kit(e, kit):
2300            """
2301            Add a Software object created from the list of (install, location)
2302            tuples passed as kit  to the software attribute of an object e.  We
2303            do this enough to break out the code, but it's kind of a hack to
2304            avoid changing the old tuple rep.
2305            """
2306
2307            s = [ topdl.Software(install=i, location=l) for i, l in kit]
2308
2309            if isinstance(e.software, list): e.software.extend(s)
2310            else: e.software = s
2311
2312
2313        if not self.auth.check_attribute(fid, 'create'):
2314            raise service_error(service_error.access, "Create access denied")
2315
2316        try:
2317            tmpdir = tempfile.mkdtemp(prefix="split-")
2318        except IOError:
2319            raise service_error(service_error.internal, "Cannot create tmp dir")
2320
2321        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
2322        gw_secretkey_base = "fed.%s" % self.ssh_type
2323        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
2324        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
2325        tclfile = tmpdir + "/experiment.tcl"
2326        tbparams = { }
2327        try:
2328            access_user = self.accessdb[fid]
2329        except KeyError:
2330            raise service_error(service_error.internal,
2331                    "Access map and authorizer out of sync in " + \
2332                            "create_experiment for fedid %s"  % fid)
2333
2334        pid = "dummy"
2335        gid = "dummy"
2336        try:
2337            os.mkdir(tmpdir+"/keys")
2338        except OSError:
2339            raise service_error(service_error.internal,
2340                    "Can't make temporary dir")
2341
2342        req = req.get('CreateRequestBody', None)
2343        if not req:
2344            raise service_error(service_error.req,
2345                    "Bad request format (no CreateRequestBody)")
2346        # The tcl parser needs to read a file so put the content into that file
2347        descr=req.get('experimentdescription', None)
2348        if descr:
2349            file_content=descr.get('ns2description', None)
2350            if file_content:
2351                try:
2352                    f = open(tclfile, 'w')
2353                    f.write(file_content)
2354                    f.close()
2355                except IOError:
2356                    raise service_error(service_error.internal,
2357                            "Cannot write temp experiment description")
2358            else:
2359                raise service_error(service_error.req, 
2360                        "Only ns2descriptions supported")
2361        else:
2362            raise service_error(service_error.req, "No experiment description")
2363
2364        # Generate an ID for the experiment (slice) and a certificate that the
2365        # allocator can use to prove they own it.  We'll ship it back through
2366        # the encrypted connection.
2367        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
2368
2369        if req.has_key('experimentID') and \
2370                req['experimentID'].has_key('localname'):
2371            overwrite = False
2372            eid = req['experimentID']['localname']
2373            # If there's an old failed experiment here with the same local name
2374            # and accessible by this user, we'll overwrite it, otherwise we'll
2375            # fall through and do the collision avoidance.
2376            old_expid = self.get_experiment_fedid(eid)
2377            if old_expid and self.check_experiment_access(fid, old_expid):
2378                self.state_lock.acquire()
2379                status = self.state[eid].get('experimentStatus', None)
2380                if status and status == 'failed':
2381                    # remove the old access attribute
2382                    self.auth.unset_attribute(fid, old_expid)
2383                    overwrite = True
2384                    del self.state[eid]
2385                    del self.state[old_expid]
2386                self.state_lock.release()
2387            self.state_lock.acquire()
2388            while (self.state.has_key(eid) and not overwrite):
2389                eid += random.choice(string.ascii_letters)
2390            # Initial state
2391            self.state[eid] = {
2392                    'experimentID' : \
2393                            [ { 'localname' : eid }, {'fedid': expid } ],
2394                    'experimentStatus': 'starting',
2395                    'experimentAccess': { 'X509' : expcert },
2396                    'owner': fid,
2397                    'log' : [],
2398                }
2399            self.state[expid] = self.state[eid]
2400            if self.state_filename: self.write_state()
2401            self.state_lock.release()
2402        else:
2403            eid = self.exp_stem
2404            for i in range(0,5):
2405                eid += random.choice(string.ascii_letters)
2406            self.state_lock.acquire()
2407            while (self.state.has_key(eid)):
2408                eid = self.exp_stem
2409                for i in range(0,5):
2410                    eid += random.choice(string.ascii_letters)
2411            # Initial state
2412            self.state[eid] = {
2413                    'experimentID' : \
2414                            [ { 'localname' : eid }, {'fedid': expid } ],
2415                    'experimentStatus': 'starting',
2416                    'experimentAccess': { 'X509' : expcert },
2417                    'owner': fid,
2418                    'log' : [],
2419                }
2420            self.state[expid] = self.state[eid]
2421            if self.state_filename: self.write_state()
2422            self.state_lock.release()
2423
2424        try: 
2425            # This catches exceptions to clear the placeholder if necessary
2426            try:
2427                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2428            except ValueError:
2429                raise service_error(service_error.server_config, 
2430                        "Bad key type (%s)" % self.ssh_type)
2431
2432            user = req.get('user', None)
2433            if user == None:
2434                raise service_error(service_error.req, "No user")
2435
2436            master = req.get('master', None)
2437            if not master:
2438                raise service_error(service_error.req,
2439                        "No master testbed label")
2440            export_project = req.get('exportProject', None)
2441            if not export_project:
2442                raise service_error(service_error.req, "No export project")
2443           
2444            if self.splitter_url:
2445                self.log.debug("Calling remote splitter at %s" % \
2446                        self.splitter_url)
2447                split_data = self.remote_splitter(self.splitter_url,
2448                        file_content, master)
2449            else:
2450                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2451                    str(self.muxmax), '-m', master]
2452
2453                if self.fedkit:
2454                    tclcmd.append('-k')
2455
2456                if self.gatewaykit:
2457                    tclcmd.append('-K')
2458
2459                tclcmd.extend([pid, gid, eid, tclfile])
2460
2461                self.log.debug("running local splitter %s", " ".join(tclcmd))
2462                # This is just fantastic.  As a side effect the parser copies
2463                # tb_compat.tcl into the current directory, so that directory
2464                # must be writable by the fedd user.  Doing this in the
2465                # temporary subdir ensures this is the case.
2466                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2467                        cwd=tmpdir)
2468                split_data = tclparser.stdout
2469
2470            allocated = { }         # Testbeds we can access
2471            # Allocate IP addresses: The allocator is a buddy system memory
2472            # allocator.  Allocate from the largest substrate to the
2473            # smallest to make the packing more likely to work - i.e.
2474            # avoiding internal fragmentation.
2475            top = topdl.topology_from_xml(file=split_data, top="experiment")
2476            subs = sorted(top.substrates, 
2477                    cmp=lambda x,y: cmp(len(x.interfaces), 
2478                        len(y.interfaces)),
2479                    reverse=True)
2480            ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
2481            ifs = { }
2482            hosts = [ ]
2483            # The config urlpath
2484            configpath = "/%s/config" % expid
2485            # The config file system location
2486            configdir ="%s%s" % ( self.repodir, configpath)
2487
2488            for idx, s in enumerate(subs):
2489                a = ips.allocate(len(s.interfaces)+2)
2490                if a :
2491                    base, num = a
2492                    if num < len(s.interfaces) +2 : 
2493                        raise service_error(service_error.internal,
2494                                "Allocator returned wrong number of IPs??")
2495                else:
2496                    raise service_error(service_error.req, 
2497                            "Cannot allocate IP addresses")
2498
2499                base += 1
2500                for i in s.interfaces:
2501                    i.attribute.append(
2502                            topdl.Attribute('ip4_address', 
2503                                "%s" % ip_addr(base)))
2504                    hname = i.element.name[0]
2505                    if ifs.has_key(hname):
2506                        hosts.append("%s\t%s-%s %s-%d" % \
2507                                (ip_addr(base), hname, s.name, hname,
2508                                    ifs[hname]))
2509                    else:
2510                        ifs[hname] = 0
2511                        hosts.append("%s\t%s-%s %s-%d %s" % \
2512                                (ip_addr(base), hname, s.name, hname,
2513                                    ifs[hname], hname))
2514
2515                    ifs[hname] += 1
2516                    base += 1
2517            # save config files
2518            try:
2519                os.makedirs(configdir)
2520            except IOError, e:
2521                raise service_error(
2522                        "Cannot create config directory: %s" % e)
2523            # Find the testbeds to look up
2524            testbeds = set([ a.value for e in top.elements \
2525                    for a in e.attribute \
2526                        if a.attribute == 'testbed'] )
2527
2528
2529            # Make per testbed topologies.  Copy the main topo and remove
2530            # interfaces and nodes that don't live in the testbed.
2531            topo ={ }
2532            for tb in testbeds:
2533                self.get_access(tb, None, user, tbparams, master,
2534                        export_project, access_user)
2535                allocated[tb] = 1
2536                topo[tb] = top.clone()
2537                to_delete = [ ]
2538                for e in topo[tb].elements:
2539                    etb = e.get_attribute('testbed')
2540                    if etb and etb != tb:
2541                        for i in e.interface:
2542                            for s in i.subs:
2543                                try:
2544                                    s.interfaces.remove(i)
2545                                except ValueError:
2546                                    raise service_error(service_error.internal,
2547                                            "Can't remove interface??")
2548                        to_delete.append(e)
2549                for e in to_delete:
2550                    topo[tb].elements.remove(e)
2551                topo[tb].make_indices()
2552
2553                for e in topo[tb].elements:
2554                    if tb == master:
2555                        cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
2556                    else:
2557                        cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
2558                    scmd = e.get_attribute('startup')
2559                    if scmd:
2560                        cmd = "%s \\$USER '%s'" % (cmd, scmd)
2561
2562                    e.set_attribute('startup', cmd)
2563                    if self.fedkit: add_kit(e, self.fedkit)
2564
2565            # Copy configuration files into the remote file store
2566            try:
2567                f = open("%s/hosts" % configdir, "w")
2568                f.write('\n'.join(hosts))
2569                f.close()
2570            except IOError, e:
2571                raise service_error(service_error.internal, 
2572                        "Cannot write hosts file: %s" % e)
2573            try:
2574                self.copy_file("%s" % gw_pubkey, "%s/%s" % \
2575                        (configdir, gw_pubkey_base))
2576                self.copy_file("%s" % gw_secretkey, "%s/%s" % \
2577                        (configdir, gw_secretkey_base))
2578            except IOError, e:
2579                raise service_error(service_error.internal, 
2580                        "Cannot copy keyfiles: %s" % e)
2581
2582            # Allow the individual testbeds to access the configuration files.
2583            for tb in tbparams.keys():
2584                asignee = tbparams[tb]['allocID']['fedid']
2585                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2586                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2587                    print "assigned %s/%s" % (configpath, f)
2588
2589            # Now, for each substrate in the main topology, find those that
2590            # have nodes on more than one testbed.  Insert portal nodes
2591            # into the copies of those substrates on the sub topologies.
2592            for s in top.substrates:
2593                # tbs will contain an ip address on this subsrate that is in
2594                # each testbed.
2595                tbs = { }
2596                for i in s.interfaces:
2597                    e = i.element
2598                    tb = e.get_attribute('testbed')
2599                    if tb and not tbs.has_key(tb):
2600                        for i in e.interface:
2601                            if s in i.subs:
2602                                tbs[tb]= i.get_attribute('ip4_address')
2603                if len(tbs) < 2:
2604                    continue
2605
2606                # More than one testbed is on this substrate.  Insert
2607                # some portals into the subtopologies.  st == source testbed,
2608                # dt == destination testbed.
2609                segment_substrate = { }
2610                for st in tbs.keys():
2611                    segment_substrate[st] = { }
2612                    for dt in [ t for t in tbs.keys() if t != st]:
2613                        myname =  "%stunnel" % dt
2614                        desthost  =  "%stunnel" % st
2615                        sproject = tbparams[st].get('project', 'project')
2616                        dproject = tbparams[dt].get('project', 'project')
2617                        mproject = tbparams[master].get('project', 'project')
2618                        sdomain = tbparams[st].get('domain', ".example.com")
2619                        ddomain = tbparams[dt].get('domain', ".example.com")
2620                        mdomain = tbparams[master].get('domain', '.example.com')
2621                        # XXX: active and type need to be unkludged
2622                        active = ("%s" % (st == master))
2623                        if not segment_substrate[st].has_key(dt):
2624                            # Put a substrate and a segment for the connected
2625                            # testbed in there.
2626                            tsubstrate = \
2627                                    topdl.Substrate(name='%s-%s' % (st, dt),
2628                                            attribute= [
2629                                                topdl.Attribute(
2630                                                    attribute='portal',
2631                                                    value='true')
2632                                                ]
2633                                            )
2634                            segment_element = topdl.Segment(
2635                                    id= tbparams[dt]['allocID'],
2636                                    type='emulab',
2637                                    uri = self.tbmap.get(dt, None),
2638                                    interface=[ 
2639                                        topdl.Interface(
2640                                            substrate=tsubstrate.name),
2641                                        ],
2642                                    attribute = [
2643                                        topdl.Attribute(attribute=n, value=v)
2644                                            for n, v in (\
2645                                                ('domain', ddomain),
2646                                                ('experiment', "%s/%s" % \
2647                                                        (dproject, eid)),)
2648                                        ],
2649                                    )
2650                            segment_substrate[st][dt] = tsubstrate
2651                            topo[st].substrates.append(tsubstrate)
2652                            topo[st].elements.append(segment_element)
2653                        portal = topdl.Computer(
2654                                name="%stunnel" % dt,
2655                                attribute=[ 
2656                                    topdl.Attribute(attribute=n,value=v)
2657                                        for n, v in (\
2658                                            ('portal', 'true'),
2659                                            ('domain', sdomain),
2660                                            ('masterdomain', mdomain),
2661                                            ('masterexperiment', "%s/%s" % \
2662                                                    (mproject, eid)),
2663                                            ('experiment', "%s/%s" % \
2664                                                    (sproject, eid)),
2665                                            ('peer', "%s" % desthost),
2666                                            ('peer_segment', "%s" % \
2667                                                    tbparams[dt]['allocID']['fedid']),
2668                                            ('scriptdir', 
2669                                                "/usr/local/federation/bin"),
2670                                            ('active', "%s" % active),
2671                                            ('portal_type', 'both'), 
2672                                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), eid.lower(), sproject.lower(), sdomain.lower())))
2673                                    ],
2674                                interface=[
2675                                    topdl.Interface(
2676                                        substrate=s.name,
2677                                        attribute=[ 
2678                                            topdl.Attribute(
2679                                                attribute='ip4_addreess', 
2680                                                value=tbs[dt]
2681                                            )
2682                                        ]),
2683                                    topdl.Interface(
2684                                        substrate=\
2685                                            segment_substrate[st][dt].name,
2686                                        attribute=[
2687                                            topdl.Attribute(attribute='portal',
2688                                                value='true')
2689                                            ]
2690                                        ),
2691                                    ],
2692                                )
2693                        if self.fedkit: add_kit(portal, self.fedkit)
2694                        if self.gatewaykit: add_kit(portal, self.gatewaykit)
2695
2696                        topo[st].elements.append(portal)
2697
2698            # Connect the gateway nodes into the topologies and clear out
2699            # substrates that are not in the topologies
2700            for tb in testbeds:
2701                topo[tb].incorporate_elements()
2702                topo[tb].substrates = \
2703                        [s for s in topo[tb].substrates \
2704                            if len(s.interfaces) >0]
2705
2706            # Copy the rpms and tarfiles to a distribution directory from
2707            # which the federants can retrieve them
2708            linkpath = "%s/software" %  expid
2709            softdir ="%s/%s" % ( self.repodir, linkpath)
2710            softmap = { }
2711            pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
2712                    for p, t in l ])
2713            pkgs.update([x.location for e in top.elements \
2714                    for x in e.software])
2715            try:
2716                os.makedirs(softdir)
2717            except IOError, e:
2718                raise service_error(
2719                        "Cannot create software directory: %s" % e)
2720            for pkg in pkgs:
2721                loc = pkg
2722
2723                scheme, host, path = urlparse(loc)[0:3]
2724                dest = os.path.basename(path)
2725                if not scheme:
2726                    if not loc.startswith('/'):
2727                        loc = "/%s" % loc
2728                    loc = "file://%s" %loc
2729                try:
2730                    u = urlopen(loc)
2731                except Exception, e:
2732                    raise service_error(service_error.req, 
2733                            "Cannot open %s: %s" % (loc, e))
2734                try:
2735                    f = open("%s/%s" % (softdir, dest) , "w")
2736                    self.log.debug("Writing %s/%s" % (softdir,dest) )
2737                    data = u.read(4096)
2738                    while data:
2739                        f.write(data)
2740                        data = u.read(4096)
2741                    f.close()
2742                    u.close()
2743                except Exception, e:
2744                    raise service_error(service_error.internal,
2745                            "Could not copy %s: %s" % (loc, e))
2746                path = re.sub("/tmp", "", linkpath)
2747                # XXX
2748                softmap[pkg] = \
2749                        "https://users.isi.deterlab.net:23232/%s/%s" %\
2750                        ( path, dest)
2751
2752                # Allow the individual testbeds to access the software.
2753                for tb in tbparams.keys():
2754                    self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
2755                            "/%s/%s" % ( path, dest))
2756
2757            # Convert the software locations in the segments into the local
2758            # copies on this host
2759            for soft in [ s for tb in topo.values() \
2760                    for e in tb.elements \
2761                        if getattr(e, 'software', False) \
2762                            for s in e.software ]:
2763                if softmap.has_key(soft.location):
2764                    soft.location = softmap[soft.location]
2765
2766            vtopo = topdl.topology_to_vtopo(top)
2767            vis = self.genviz(vtopo)
2768
2769            # save federant information
2770            for k in allocated.keys():
2771                tbparams[k]['federant'] = {\
2772                        'name': [ { 'localname' : eid} ],\
2773                        'emulab': tbparams[k]['emulab'],\
2774                        'allocID' : tbparams[k]['allocID'],\
2775                        'master' : k == master,\
2776                    }
2777
2778            self.state_lock.acquire()
2779            self.state[eid]['vtopo'] = vtopo
2780            self.state[eid]['vis'] = vis
2781            self.state[expid]['federant'] = \
2782                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2783                        if tbparams[tb].has_key('federant') ]
2784            if self.state_filename: 
2785                self.write_state()
2786            self.state_lock.release()
2787        except service_error, e:
2788            # If something goes wrong in the parse (usually an access error)
2789            # clear the placeholder state.  From here on out the code delays
2790            # exceptions.  Failing at this point returns a fault to the remote
2791            # caller.
2792
2793            self.state_lock.acquire()
2794            del self.state[eid]
2795            del self.state[expid]
2796            if self.state_filename: self.write_state()
2797            self.state_lock.release()
2798            raise e
2799
2800
2801        # Start the background swapper and return the starting state.  From
2802        # here on out, the state will stick around a while.
2803
2804        # Let users touch the state
2805        self.auth.set_attribute(fid, expid)
2806        self.auth.set_attribute(expid, expid)
2807        # Override fedids can manipulate state as well
2808        for o in self.overrides:
2809            self.auth.set_attribute(o, expid)
2810
2811        # Create a logger that logs to the experiment's state object as well as
2812        # to the main log file.
2813        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2814        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2815        # XXX: there should be a global one of these rather than repeating the
2816        # code.
2817        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2818                    '%d %b %y %H:%M:%S'))
2819        alloc_log.addHandler(h)
2820       
2821        # XXX
2822        url_base = 'https://users.isi.deterlab.net:23232'
2823        attrs = [ 
2824                {
2825                    'attribute': 'ssh_pubkey', 
2826                    'value': '%s/%s/config/%s' % \
2827                            (url_base, expid, gw_pubkey_base)
2828                },
2829                {
2830                    'attribute': 'ssh_secretkey', 
2831                    'value': '%s/%s/config/%s' % \
2832                            (url_base, expid, gw_secretkey_base)
2833                },
2834                {
2835                    'attribute': 'hosts', 
2836                    'value': '%s/%s/config/hosts' % \
2837                            (url_base, expid)
2838                },
2839                {
2840                    'attribute': 'experiment_name',
2841                    'value': eid,
2842                },
2843            ]
2844
2845        # Start a thread to do the resource allocation
2846        t  = Thread(target=self.new_allocate_resources,
2847                args=(allocated, master, eid, expid, expcert, tbparams, 
2848                    topo, tmpdir, alloc_log, attrs),
2849                name=eid)
2850        t.start()
2851
2852        rv = {
2853                'experimentID': [
2854                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2855                ],
2856                'experimentStatus': 'starting',
2857                'experimentAccess': { 'X509' : expcert }
2858            }
2859
2860        return rv
2861   
2862    def get_experiment_fedid(self, key):
2863        """
2864        find the fedid associated with the localname key in the state database.
2865        """
2866
2867        rv = None
2868        self.state_lock.acquire()
2869        if self.state.has_key(key):
2870            if isinstance(self.state[key], dict):
2871                try:
2872                    kl = [ f['fedid'] for f in \
2873                            self.state[key]['experimentID']\
2874                                if f.has_key('fedid') ]
2875                except KeyError:
2876                    self.state_lock.release()
2877                    raise service_error(service_error.internal, 
2878                            "No fedid for experiment %s when getting "+\
2879                                    "fedid(!?)" % key)
2880                if len(kl) == 1:
2881                    rv = kl[0]
2882                else:
2883                    self.state_lock.release()
2884                    raise service_error(service_error.internal, 
2885                            "multiple fedids for experiment %s when " +\
2886                                    "getting fedid(!?)" % key)
2887            else:
2888                self.state_lock.release()
2889                raise service_error(service_error.internal, 
2890                        "Unexpected state for %s" % key)
2891        self.state_lock.release()
2892        return rv
2893
2894    def check_experiment_access(self, fid, key):
2895        """
2896        Confirm that the fid has access to the experiment.  Though a request
2897        may be made in terms of a local name, the access attribute is always
2898        the experiment's fedid.
2899        """
2900        if not isinstance(key, fedid):
2901            key = self.get_experiment_fedid(key)
2902
2903        if self.auth.check_attribute(fid, key):
2904            return True
2905        else:
2906            raise service_error(service_error.access, "Access Denied")
2907
2908
2909    def get_handler(self, path, fid):
2910        print "%s" %  path
2911        if self.auth.check_attribute(fid, path):
2912            return ("%s/%s" % (self.repodir, path), "application/binary")
2913        else:
2914            return (None, None)
2915
2916    def get_vtopo(self, req, fid):
2917        """
2918        Return the stored virtual topology for this experiment
2919        """
2920        rv = None
2921        state = None
2922
2923        req = req.get('VtopoRequestBody', None)
2924        if not req:
2925            raise service_error(service_error.req,
2926                    "Bad request format (no VtopoRequestBody)")
2927        exp = req.get('experiment', None)
2928        if exp:
2929            if exp.has_key('fedid'):
2930                key = exp['fedid']
2931                keytype = "fedid"
2932            elif exp.has_key('localname'):
2933                key = exp['localname']
2934                keytype = "localname"
2935            else:
2936                raise service_error(service_error.req, "Unknown lookup type")
2937        else:
2938            raise service_error(service_error.req, "No request?")
2939
2940        self.check_experiment_access(fid, key)
2941
2942        self.state_lock.acquire()
2943        if self.state.has_key(key):
2944            if self.state[key].has_key('vtopo'):
2945                rv = { 'experiment' : {keytype: key },\
2946                        'vtopo': self.state[key]['vtopo'],\
2947                    }
2948            else:
2949                state = self.state[key]['experimentStatus']
2950        self.state_lock.release()
2951
2952        if rv: return rv
2953        else: 
2954            if state:
2955                raise service_error(service_error.partial, 
2956                        "Not ready: %s" % state)
2957            else:
2958                raise service_error(service_error.req, "No such experiment")
2959
2960    def get_vis(self, req, fid):
2961        """
2962        Return the stored visualization for this experiment
2963        """
2964        rv = None
2965        state = None
2966
2967        req = req.get('VisRequestBody', None)
2968        if not req:
2969            raise service_error(service_error.req,
2970                    "Bad request format (no VisRequestBody)")
2971        exp = req.get('experiment', None)
2972        if exp:
2973            if exp.has_key('fedid'):
2974                key = exp['fedid']
2975                keytype = "fedid"
2976            elif exp.has_key('localname'):
2977                key = exp['localname']
2978                keytype = "localname"
2979            else:
2980                raise service_error(service_error.req, "Unknown lookup type")
2981        else:
2982            raise service_error(service_error.req, "No request?")
2983
2984        self.check_experiment_access(fid, key)
2985
2986        self.state_lock.acquire()
2987        if self.state.has_key(key):
2988            if self.state[key].has_key('vis'):
2989                rv =  { 'experiment' : {keytype: key },\
2990                        'vis': self.state[key]['vis'],\
2991                        }
2992            else:
2993                state = self.state[key]['experimentStatus']
2994        self.state_lock.release()
2995
2996        if rv: return rv
2997        else:
2998            if state:
2999                raise service_error(service_error.partial, 
3000                        "Not ready: %s" % state)
3001            else:
3002                raise service_error(service_error.req, "No such experiment")
3003
3004    def clean_info_response(self, rv):
3005        """
3006        Remove the information in the experiment's state object that is not in
3007        the info response.
3008        """
3009        # Remove the owner info (should always be there, but...)
3010        if rv.has_key('owner'): del rv['owner']
3011
3012        # Convert the log into the allocationLog parameter and remove the
3013        # log entry (with defensive programming)
3014        if rv.has_key('log'):
3015            rv['allocationLog'] = "".join(rv['log'])
3016            del rv['log']
3017        else:
3018            rv['allocationLog'] = ""
3019
3020        if rv['experimentStatus'] != 'active':
3021            if rv.has_key('federant'): del rv['federant']
3022        else:
3023            # remove the allocationID info from each federant
3024            for f in rv.get('federant', []):
3025                if f.has_key('allocID'): del f['allocID']
3026        return rv
3027
3028    def get_info(self, req, fid):
3029        """
3030        Return all the stored info about this experiment
3031        """
3032        rv = None
3033
3034        req = req.get('InfoRequestBody', None)
3035        if not req:
3036            raise service_error(service_error.req,
3037                    "Bad request format (no InfoRequestBody)")
3038        exp = req.get('experiment', None)
3039        if exp:
3040            if exp.has_key('fedid'):
3041                key = exp['fedid']
3042                keytype = "fedid"
3043            elif exp.has_key('localname'):
3044                key = exp['localname']
3045                keytype = "localname"
3046            else:
3047                raise service_error(service_error.req, "Unknown lookup type")
3048        else:
3049            raise service_error(service_error.req, "No request?")
3050
3051        self.check_experiment_access(fid, key)
3052
3053        # The state may be massaged by the service function that called
3054        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
3055        # state.
3056        self.state_lock.acquire()
3057        if self.state.has_key(key):
3058            rv = copy.deepcopy(self.state[key])
3059        self.state_lock.release()
3060
3061        if rv:
3062            return self.clean_info_response(rv)
3063        else:
3064            raise service_error(service_error.req, "No such experiment")
3065
3066    def get_multi_info(self, req, fid):
3067        """
3068        Return all the stored info that this fedid can access
3069        """
3070        rv = { 'info': [ ] }
3071
3072        self.state_lock.acquire()
3073        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
3074            self.check_experiment_access(fid, key)
3075
3076            if self.state.has_key(key):
3077                e = copy.deepcopy(self.state[key])
3078                e = self.clean_info_response(e)
3079                rv['info'].append(e)
3080        self.state_lock.release()
3081        return rv
3082
3083
3084    def terminate_experiment(self, req, fid):
3085        """
3086        Swap this experiment out on the federants and delete the shared
3087        information
3088        """
3089        tbparams = { }
3090        req = req.get('TerminateRequestBody', None)
3091        if not req:
3092            raise service_error(service_error.req,
3093                    "Bad request format (no TerminateRequestBody)")
3094        force = req.get('force', False)
3095        exp = req.get('experiment', None)
3096        if exp:
3097            if exp.has_key('fedid'):
3098                key = exp['fedid']
3099                keytype = "fedid"
3100            elif exp.has_key('localname'):
3101                key = exp['localname']
3102                keytype = "localname"
3103            else:
3104                raise service_error(service_error.req, "Unknown lookup type")
3105        else:
3106            raise service_error(service_error.req, "No request?")
3107
3108        self.check_experiment_access(fid, key)
3109
3110        dealloc_list = [ ]
3111
3112
3113        # Create a logger that logs to the dealloc_list as well as to the main
3114        # log file.
3115        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
3116        h = logging.StreamHandler(self.list_log(dealloc_list))
3117        # XXX: there should be a global one of these rather than repeating the
3118        # code.
3119        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
3120                    '%d %b %y %H:%M:%S'))
3121        dealloc_log.addHandler(h)
3122
3123        self.state_lock.acquire()
3124        fed_exp = self.state.get(key, None)
3125
3126        if fed_exp:
3127            # This branch of the conditional holds the lock to generate a
3128            # consistent temporary tbparams variable to deallocate experiments.
3129            # It releases the lock to do the deallocations and reacquires it to
3130            # remove the experiment state when the termination is complete.
3131
3132            # First make sure that the experiment creation is complete.
3133            status = fed_exp.get('experimentStatus', None)
3134
3135            if status:
3136                if status in ('starting', 'terminating'):
3137                    if not force:
3138                        self.state_lock.release()
3139                        raise service_error(service_error.partial, 
3140                                'Experiment still being created or destroyed')
3141                    else:
3142                        self.log.warning('Experiment in %s state ' % status + \
3143                                'being terminated by force.')
3144            else:
3145                # No status??? trouble
3146                self.state_lock.release()
3147                raise service_error(service_error.internal,
3148                        "Experiment has no status!?")
3149
3150            ids = []
3151            #  experimentID is a list of dicts that are self-describing
3152            #  identifiers.  This finds all the fedids and localnames - the
3153            #  keys of self.state - and puts them into ids.
3154            for id in fed_exp.get('experimentID', []):
3155                if id.has_key('fedid'): ids.append(id['fedid'])
3156                if id.has_key('localname'): ids.append(id['localname'])
3157
3158            # Construct enough of the tbparams to make the stop_segment calls
3159            # work
3160            for fed in fed_exp.get('federant', []):
3161                try:
3162                    for e in fed['name']:
3163                        eid = e.get('localname', None)
3164                        if eid: break
3165                    else:
3166                        continue
3167
3168                    p = fed['emulab']['project']
3169
3170                    project = p['name']['localname']
3171                    tb = p['testbed']['localname']
3172                    user = p['user'][0]['userID']['localname']
3173
3174                    domain = fed['emulab']['domain']
3175                    host  = fed['emulab']['ops']
3176                    aid = fed['allocID']
3177                except KeyError, e:
3178                    continue
3179                tbparams[tb] = {\
3180                        'user': user,\
3181                        'domain': domain,\
3182                        'project': project,\
3183                        'host': host,\
3184                        'eid': eid,\
3185                        'aid': aid,\
3186                    }
3187            fed_exp['experimentStatus'] = 'terminating'
3188            if self.state_filename: self.write_state()
3189            self.state_lock.release()
3190
3191            # Stop everyone.  NB, wait_for_all waits until a thread starts and
3192            # then completes, so we can't wait if nothing starts.  So, no
3193            # tbparams, no start.
3194            if len(tbparams) > 0:
3195                thread_pool = self.thread_pool(self.nthreads)
3196                for tb in tbparams.keys():
3197                    # Create and start a thread to stop the segment
3198                    thread_pool.wait_for_slot()
3199                    t  = self.pooled_thread(\
3200                            target=self.stop_segment(log=dealloc_log,
3201                                keyfile=self.ssh_privkey_file, debug=self.debug), 
3202                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
3203                            pdata=thread_pool, trace_file=self.trace_file)
3204                    t.start()
3205                # Wait for completions
3206                thread_pool.wait_for_all_done()
3207
3208            # release the allocations (failed experiments have done this
3209            # already, and starting experiments may be in odd states, so we
3210            # ignore errors releasing those allocations
3211            try: 
3212                for tb in tbparams.keys():
3213                    self.release_access(tb, tbparams[tb]['aid'])
3214            except service_error, e:
3215                if status != 'failed' and not force:
3216                    raise e
3217
3218            # Remove the terminated experiment
3219            self.state_lock.acquire()
3220            for id in ids:
3221                if self.state.has_key(id): del self.state[id]
3222
3223            if self.state_filename: self.write_state()
3224            self.state_lock.release()
3225
3226            return { 
3227                    'experiment': exp , 
3228                    'deallocationLog': "".join(dealloc_list),
3229                    }
3230        else:
3231            # Don't forget to release the lock
3232            self.state_lock.release()
3233            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.