source: fedd/federation/experiment_control.py @ eec716b

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since eec716b was 70caa72, checked in by Ted Faber <faber@…>, 15 years ago

Fix from 1.30 branch

  • Property mode set to 100644
File size: 91.2 KB
RevLine 
[6679c122]1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
[eee2b2e]11import pickle
[c971895]12import logging
[79b6596]13import signal
14import time
[6679c122]15
[3441fe3]16import traceback
[c971895]17# For parsing visualization output and splitter output
18import xml.parsers.expat
[3441fe3]19
[1af38d6]20from threading import *
[6679c122]21from subprocess import *
22
[ec4fb42]23from util import *
[51cc9df]24from fedid import fedid, generate_fedid
[9460b1e]25from remote_service import xmlrpc_handler, soap_handler, service_caller
[c971895]26from service_error import service_error
[6679c122]27
[11a08b0]28
29class nullHandler(logging.Handler):
30    def emit(self, record): pass
31
32fl = logging.getLogger("fedd.experiment_control")
33fl.addHandler(nullHandler())
34
[ec4fb42]35class experiment_control_local:
[0ea11af]36    """
37    Control of experiments that this system can directly access.
38
39    Includes experiment creation, termination and information dissemination.
40    Thred safe.
41    """
[79b6596]42
43    class ssh_cmd_timeout(RuntimeError): pass
[4b362df]44
45    class list_log:
46        """
47        Provide an interface that lets logger.StreamHandler s write to a list
48        of strings.
49        """
50        def __init__(self, l=[]):
51            """
52            Link to an existing list or just create a log
53            """
54            self.ll = l
55            self.lock = Lock()
56        def write(self, str):
57            """
58            Add the string to the log.  Lock for consistency.
59            """
60            self.lock.acquire()
61            self.ll.append(str)
62            self.lock.release()
63
64        def flush(self):
65            """
66            No-op that StreamHandlers expect
67            """
68            pass
69
[6679c122]70   
[1af38d6]71    class thread_pool:
[866c983]72        """
73        A class to keep track of a set of threads all invoked for the same
74        task.  Manages the mutual exclusion of the states.
75        """
76        def __init__(self, nthreads):
77            """
78            Start a pool.
79            """
80            self.changed = Condition()
81            self.started = 0
82            self.terminated = 0
83            self.nthreads = nthreads
84
85        def acquire(self):
86            """
87            Get the pool's lock.
88            """
89            self.changed.acquire()
90
91        def release(self):
92            """
93            Release the pool's lock.
94            """
95            self.changed.release()
96
97        def wait(self, timeout = None):
98            """
99            Wait for a pool thread to start or stop.
100            """
101            self.changed.wait(timeout)
102
103        def start(self):
104            """
105            Called by a pool thread to report starting.
106            """
107            self.changed.acquire()
108            self.started += 1
109            self.changed.notifyAll()
110            self.changed.release()
111
112        def terminate(self):
113            """
114            Called by a pool thread to report finishing.
115            """
116            self.changed.acquire()
117            self.terminated += 1
118            self.changed.notifyAll()
119            self.changed.release()
120
121        def clear(self):
122            """
123            Clear all pool data.
124            """
125            self.changed.acquire()
126            self.started = 0
127            self.terminated =0
128            self.changed.notifyAll()
129            self.changed.release()
130
131        def wait_for_slot(self):
132            """
133            Wait until we have a free slot to start another pooled thread
134            """
135            self.acquire()
136            while self.started - self.terminated >= self.nthreads:
137                self.wait()
138            self.release()
139
140        def wait_for_all_done(self):
141            """
142            Wait until all active threads finish (and at least one has started)
143            """
144            self.acquire()
145            while self.started == 0 or self.started > self.terminated:
146                self.wait()
147            self.release()
[8bc5754]148
[1af38d6]149    class pooled_thread(Thread):
[866c983]150        """
151        One of a set of threads dedicated to a specific task.  Uses the
152        thread_pool class above for coordination.
153        """
154        def __init__(self, group=None, target=None, name=None, args=(), 
155                kwargs={}, pdata=None, trace_file=None):
156            Thread.__init__(self, group, target, name, args, kwargs)
157            self.rv = None          # Return value of the ops in this thread
158            self.exception = None   # Exception that terminated this thread
159            self.target=target      # Target function to run on start()
160            self.args = args        # Args to pass to target
161            self.kwargs = kwargs    # Additional kw args
162            self.pdata = pdata      # thread_pool for this class
163            # Logger for this thread
164            self.log = logging.getLogger("fedd.experiment_control")
165       
166        def run(self):
167            """
168            Emulate Thread.run, except add pool data manipulation and error
169            logging.
170            """
171            if self.pdata:
172                self.pdata.start()
173
174            if self.target:
175                try:
176                    self.rv = self.target(*self.args, **self.kwargs)
177                except service_error, s:
178                    self.exception = s
179                    self.log.error("Thread exception: %s %s" % \
180                            (s.code_string(), s.desc))
181                except:
182                    self.exception = sys.exc_info()[1]
183                    self.log.error(("Unexpected thread exception: %s" +\
184                            "Trace %s") % (self.exception,\
185                                traceback.format_exc()))
186            if self.pdata:
187                self.pdata.terminate()
[6679c122]188
[f069052]189    call_RequestAccess = service_caller('RequestAccess')
190    call_ReleaseAccess = service_caller('ReleaseAccess')
191    call_Ns2Split = service_caller('Ns2Split')
[058f58e]192
[3f6bc5f]193    def __init__(self, config=None, auth=None):
[866c983]194        """
195        Intialize the various attributes, most from the config object
196        """
197
198        def parse_tarfile_list(tf):
199            """
200            Parse a tarfile list from the configuration.  This is a set of
201            paths and tarfiles separated by spaces.
202            """
203            rv = [ ]
204            if tf is not None:
205                tl = tf.split()
206                while len(tl) > 1:
207                    p, t = tl[0:2]
208                    del tl[0:2]
209                    rv.append((p, t))
210            return rv
211
212        self.thread_with_rv = experiment_control_local.pooled_thread
213        self.thread_pool = experiment_control_local.thread_pool
[bd3e314]214        self.list_log = experiment_control_local.list_log
[866c983]215
216        self.cert_file = config.get("experiment_control", "cert_file")
217        if self.cert_file:
218            self.cert_pwd = config.get("experiment_control", "cert_pwd")
219        else:
220            self.cert_file = config.get("globals", "cert_file")
221            self.cert_pwd = config.get("globals", "cert_pwd")
222
223        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
224                or config.get("globals", "trusted_certs")
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 2
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
[ca489e8]254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
[866c983]263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'Create': soap_handler('Create', self.create_experiment),
337                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
338                'Vis': soap_handler('Vis', self.get_vis),
339                'Info': soap_handler('Info', self.get_info),
[65f3f29]340                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
[866c983]341                'Terminate': soap_handler('Terminate', 
342                    self.terminate_experiment),
343        }
344
345        self.xmlrpc_services = {\
346                'Create': xmlrpc_handler('Create', self.create_experiment),
347                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
348                'Vis': xmlrpc_handler('Vis', self.get_vis),
349                'Info': xmlrpc_handler('Info', self.get_info),
[65f3f29]350                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
[866c983]351                'Terminate': xmlrpc_handler('Terminate',
352                    self.terminate_experiment),
353        }
[19cc408]354
[6679c122]355    def copy_file(self, src, dest, size=1024):
[866c983]356        """
357        Exceedingly simple file copy.
358        """
359        s = open(src,'r')
360        d = open(dest, 'w')
361
362        buf = "x"
363        while buf != "":
364            buf = s.read(size)
365            d.write(buf)
366        s.close()
367        d.close()
[6679c122]368
[a97394b]369    # Call while holding self.state_lock
[eee2b2e]370    def write_state(self):
[866c983]371        """
372        Write a new copy of experiment state after copying the existing state
373        to a backup.
374
375        State format is a simple pickling of the state dictionary.
376        """
377        if os.access(self.state_filename, os.W_OK):
378            self.copy_file(self.state_filename, \
379                    "%s.bak" % self.state_filename)
380        try:
381            f = open(self.state_filename, 'w')
382            pickle.dump(self.state, f)
383        except IOError, e:
384            self.log.error("Can't write file %s: %s" % \
385                    (self.state_filename, e))
386        except pickle.PicklingError, e:
387            self.log.error("Pickling problem: %s" % e)
388        except TypeError, e:
389            self.log.error("Pickling problem (TypeError): %s" % e)
[eee2b2e]390
[a97394b]391    # Call while holding self.state_lock
[eee2b2e]392    def read_state(self):
[866c983]393        """
394        Read a new copy of experiment state.  Old state is overwritten.
395
396        State format is a simple pickling of the state dictionary.
397        """
398        try:
399            f = open(self.state_filename, "r")
400            self.state = pickle.load(f)
401            self.log.debug("[read_state]: Read state from %s" % \
402                    self.state_filename)
403        except IOError, e:
404            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
405                    % (self.state_filename, e))
406        except pickle.UnpicklingError, e:
407            self.log.warning(("[read_state]: No saved state: " + \
408                    "Unpickling failed: %s") % e)
409       
410        for k in self.state.keys():
411            try:
412                # This list should only have one element in it, but phrasing it
413                # as a for loop doesn't cost much, really.  We have to find the
414                # fedid elements anyway.
415                for eid in [ f['fedid'] \
416                        for f in self.state[k]['experimentID']\
417                            if f.has_key('fedid') ]:
418                    self.auth.set_attribute(self.state[k]['owner'], eid)
[ca489e8]419                    # allow overrides to control experiments as well
420                    for o in self.overrides:
421                        self.auth.set_attribute(o, eid)
[866c983]422            except KeyError, e:
423                self.log.warning("[read_state]: State ownership or identity " +\
424                        "misformatted in %s: %s" % (self.state_filename, e))
[4064742]425
426
427    def read_accessdb(self, accessdb_file):
[866c983]428        """
429        Read the mapping from fedids that can create experiments to their name
430        in the 3-level access namespace.  All will be asserted from this
431        testbed and can include the local username and porject that will be
432        asserted on their behalf by this fedd.  Each fedid is also added to the
433        authorization system with the "create" attribute.
434        """
435        self.accessdb = {}
436        # These are the regexps for parsing the db
437        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
438        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
439                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
440        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
441                "\s*->\s*(" + name_expr + ")\s*$")
442        lineno = 0
443
444        # Parse the mappings and store in self.authdb, a dict of
445        # fedid -> (proj, user)
446        try:
447            f = open(accessdb_file, "r")
448            for line in f:
449                lineno += 1
450                line = line.strip()
451                if len(line) == 0 or line.startswith('#'):
452                    continue
453                m = project_line.match(line)
454                if m:
455                    fid = fedid(hexstr=m.group(1))
456                    project, user = m.group(2,3)
457                    if not self.accessdb.has_key(fid):
458                        self.accessdb[fid] = []
459                    self.accessdb[fid].append((project, user))
460                    continue
461
462                m = user_line.match(line)
463                if m:
464                    fid = fedid(hexstr=m.group(1))
465                    project = None
466                    user = m.group(2)
467                    if not self.accessdb.has_key(fid):
468                        self.accessdb[fid] = []
469                    self.accessdb[fid].append((project, user))
470                    continue
471                self.log.warn("[experiment_control] Error parsing access " +\
472                        "db %s at line %d" %  (accessdb_file, lineno))
473        except IOError:
474            raise service_error(service_error.internal,
475                    "Error opening/reading %s as experiment " +\
476                            "control accessdb" %  accessdb_file)
477        f.close()
478
479        # Initialize the authorization attributes
480        for fid in self.accessdb.keys():
481            self.auth.set_attribute(fid, 'create')
[34bc05c]482
483    def read_mapdb(self, file):
[866c983]484        """
485        Read a simple colon separated list of mappings for the
486        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
487        """
488
489        self.tbmap = { }
490        lineno =0
491        try:
492            f = open(file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if line.startswith('#') or len(line) == 0:
497                    continue
498                try:
499                    label, url = line.split(':', 1)
500                    self.tbmap[label] = url
501                except ValueError, e:
502                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
503                            "map db: %s %s" % (lineno, line, e))
504        except IOError, e:
505            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
506                    "open %s: %s" % (file, e))
507        f.close()
[eee2b2e]508
[4b362df]509    class emulab_segment:
510        def __init__(self, log=None, keyfile=None, debug=False):
511            self.log = log or logging.getLogger(\
512                    'fedd.experiment_control.emulab_segment')
513            self.ssh_privkey_file = keyfile
514            self.debug = debug
515            self.ssh_exec="/usr/bin/ssh"
516            self.scp_exec = "/usr/bin/scp"
517            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
518
519        def scp_file(self, file, user, host, dest=""):
520            """
521            scp a file to the remote host.  If debug is set the action is only
522            logged.
523            """
524
525            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
526                    '-o', 'StrictHostKeyChecking yes', '-i', 
527                    self.ssh_privkey_file, file, 
528                    "%s@%s:%s" % (user, host, dest)]
529            rv = 0
[d0ae12d]530
[4b362df]531            try:
532                dnull = open("/dev/null", "w")
533            except IOError:
534                self.log.debug("[ssh_file]: failed to open " + \
535                        "/dev/null for redirect")
536                dnull = Null
537
538            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
539            if not self.debug:
[4ea1e22]540                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
541                        close_fds=True)
[4b362df]542
543            return rv == 0
544
545        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
546            """
[a74ea78]547            Run a remote command on host as user.  If debug is set, the action
548            is only logged.  Commands are run without stdin, to avoid stray
549            SIGTTINs.
[4b362df]550            """
[a74ea78]551            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
[4b362df]552                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
553                    (self.ssh_exec, self.ssh_privkey_file, 
554                            user, host, cmd)
[866c983]555
[4b362df]556            try:
[886307f]557                dnull = open("/dev/null", "w")
[4b362df]558            except IOError:
559                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
560                        "for redirect")
561                dnull = Null
562
563            self.log.debug("[ssh_cmd]: %s" % sh_str)
564            if not self.debug:
565                if dnull:
[4ea1e22]566                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
567                            close_fds=True)
[4b362df]568                else:
[4ea1e22]569                    sub = Popen(sh_str, shell=True,
570                            close_fds=True)
[4b362df]571                if timeout:
572                    i = 0
573                    rv = sub.poll()
574                    while i < timeout:
575                        if rv is not None: break
576                        else:
577                            time.sleep(1)
578                            rv = sub.poll()
579                            i += 1
[79b6596]580                    else:
[4b362df]581                        self.log.debug("Process exceeded runtime: %s" % sh_str)
582                        os.kill(sub.pid, signal.SIGKILL)
583                        raise self.ssh_cmd_timeout();
584                    return rv == 0
[79b6596]585                else:
[4b362df]586                    return sub.wait() == 0
[79b6596]587            else:
[d5e3b8e]588                if timeout == 0:
589                    self.log.debug("debug timeout raised on %s " % sh_str)
590                    raise self.ssh_cmd_timeout()
591                else:
592                    return True
[4b362df]593
594    class start_segment(emulab_segment):
595        def __init__(self, log=None, keyfile=None, debug=False):
596            experiment_control_local.emulab_segment.__init__(self,
597                    log=log, keyfile=keyfile, debug=debug)
598
599        def create_config_tree(self, src_dir, dest_dir, script):
600            """
601            Append commands to script that will create the directory hierarchy
602            on the remote federant.
603            """
604
605            if os.path.isdir(src_dir):
606                print >>script, "mkdir -p %s" % dest_dir
607                print >>script, "chmod 770 %s" % dest_dir
608
609                for f in os.listdir(src_dir):
610                    if os.path.isdir(f):
611                        self.create_config_tree("%s/%s" % (src_dir, f), 
612                                "%s/%s" % (dest_dir, f), script)
613            else:
614                self.log.debug("[create_config_tree]: Not a directory: %s" \
615                        % src_dir)
[c2dbca8]616
[4b362df]617        def ship_configs(self, host, user, src_dir, dest_dir):
618            """
619            Copy federant-specific configuration files to the federant.
620            """
[c2dbca8]621            for f in os.listdir(src_dir):
622                if os.path.isdir(f):
[4b362df]623                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
624                            "%s/%s" % (dest_dir, f)):
625                        return False
626                else:
627                    if not self.scp_file("%s/%s" % (src_dir, f), 
628                            user, host, dest_dir):
629                        return False
630            return True
631
632        def get_state(self, user, host, tb, pid, eid):
633            # command to test experiment state
634            expinfo_exec = "/usr/testbed/bin/expinfo" 
635            # Regular expressions to parse the expinfo response
636            state_re = re.compile("State:\s+(\w+)")
637            no_exp_re = re.compile("^No\s+such\s+experiment")
638            swapping_re = re.compile("^No\s+information\s+available.")
639            state = None    # Experiment state parsed from expinfo
640            # The expinfo ssh command.  Note the identity restriction to use
641            # only the identity provided in the pubkey given.
642            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
643                    'StrictHostKeyChecking yes', '-i', 
644                    self.ssh_privkey_file, "%s@%s" % (user, host), 
645                    expinfo_exec, pid, eid]
646
647            dev_null = None
648            try:
649                dev_null = open("/dev/null", "a")
650            except IOError, e:
651                self.log.error("[get_state]: can't open /dev/null: %s" %e)
[79b6596]652
[4b362df]653            if self.debug:
654                state = 'swapped'
655                rv = 0
656            else:
[4ea1e22]657                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
658                        close_fds=True)
[4b362df]659                for line in status.stdout:
660                    m = state_re.match(line)
661                    if m: state = m.group(1)
662                    else:
663                        for reg, st in ((no_exp_re, "none"),
664                                (swapping_re, "swapping")):
665                            m = reg.match(line)
666                            if m: state = st
667                rv = status.wait()
668
669            # If the experiment is not present the subcommand returns a
670            # non-zero return value.  If we successfully parsed a "none"
671            # outcome, ignore the return code.
672            if rv != 0 and state != 'none':
673                raise service_error(service_error.internal,
674                        "Cannot get status of segment %s:%s/%s" % \
675                                (tb, pid, eid))
676            elif state not in ('active', 'swapped', 'swapping', 'none'):
677                raise service_error(service_error.internal,
678                        "Cannot get status of segment %s:%s/%s" % \
679                                (tb, pid, eid))
680            else: return state
681
682
683        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
684            """
685            Start a sub-experiment on a federant.
686
687            Get the current state, modify or create as appropriate, ship data
688            and configs and start the experiment.  There are small ordering
689            differences based on the initial state of the sub-experiment.
690            """
691            # ops node in the federant
692            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
693            user = tbparams[tb]['user']     # federant user
694            pid = tbparams[tb]['project']   # federant project
695            # XXX
696            base_confs = ( "hosts",)
697            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
698            # Configuration directories on the remote machine
699            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
700            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
701            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
702
703            state = self.get_state(user, host, tb, pid, eid)
704
705            self.log.debug("[start_segment]: %s: %s" % (tb, state))
706            self.log.info("[start_segment]:transferring experiment to %s" % tb)
707
708            if not self.scp_file("%s/%s/%s" % \
709                    (tmpdir, tb, tclfile), user, host):
710                return False
711           
712            if state == 'none':
713                # Create a null copy of the experiment so that we capture any
714                # logs there if the modify fails.  Emulab software discards the
715                # logs from a failed startexp
716                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
717                    return False
718                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
719                timedout = False
720                try:
721                    if not self.ssh_cmd(user, host,
722                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
723                            "-e %s null.tcl") % (pid, eid), "startexp",
724                            timeout=60 * 10):
725                        return False
726                except self.ssh_cmd_timeout:
727                    timedout = True
728
729                if timedout:
[c570f7e]730                    state = self.get_state(user, host, tb, pid, eid)
[4b362df]731                    if state != "swapped":
732                        return False
733
734           
735            # Open up a temporary file to contain a script for setting up the
736            # filespace for the new experiment.
737            self.log.info("[start_segment]: creating script file")
738            try:
739                sf, scriptname = tempfile.mkstemp()
740                scriptfile = os.fdopen(sf, 'w')
741            except IOError:
742                return False
[866c983]743
[4b362df]744            scriptbase = os.path.basename(scriptname)
745
746            # Script the filesystem changes
747            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
748            # Clear and create the tarfiles and rpm directories
749            for d in (tarfiles_dir, rpms_dir):
750                print >>scriptfile, "/bin/rm -rf %s/*" % d
751                print >>scriptfile, "mkdir -p %s" % d
752            print >>scriptfile, 'mkdir -p %s' % proj_dir
753            self.create_config_tree("%s/%s" % (tmpdir, tb),
754                    proj_dir, scriptfile)
755            if os.path.isdir("%s/tarfiles" % tmpdir):
756                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
757                        scriptfile)
758            if os.path.isdir("%s/rpms" % tmpdir):
759                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 
760                        scriptfile)
761            print >>scriptfile, "rm -f %s" % scriptbase
762            scriptfile.close()
763
764            # Move the script to the remote machine
765            # XXX: could collide tempfile names on the remote host
766            if self.scp_file(scriptname, user, host, scriptbase):
767                os.remove(scriptname)
768            else:
769                return False
[866c983]770
[4b362df]771            # Execute the script (and the script's last line deletes it)
772            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
773                return False
[79b6596]774
[4b362df]775            for f in base_confs:
776                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
777                        "%s/%s" % (proj_dir, f)):
[79b6596]778                    return False
[4b362df]779            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
780                    proj_dir):
[79b6596]781                return False
[4b362df]782            if os.path.isdir("%s/tarfiles" % tmpdir):
783                if not self.ship_configs(host, user,
784                        "%s/tarfiles" % tmpdir, tarfiles_dir):
785                    return False
786            if os.path.isdir("%s/rpms" % tmpdir):
787                if not self.ship_configs(host, user,
788                        "%s/rpms" % tmpdir, tarfiles_dir):
789                    return False
790            # Stage the new configuration (active experiments will stay swapped
791            # in now)
792            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
[79b6596]793            try:
794                if not self.ssh_cmd(user, host,
[4b362df]795                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
796                                (pid, eid, tclfile),
797                        "modexp", timeout= 60 * 10):
[79b6596]798                    return False
799            except self.ssh_cmd_timeout:
[d5e3b8e]800                self.log.error("Modify command failed to complete in time")
[4b362df]801                # There's really no way to see if this succeeded or failed, so
802                # if it hangs, assume the worst.
803                return False
804            # Active experiments are still swapped, this swaps the others in.
805            if state != 'active':
806                self.log.info("[start_segment]: Swapping %s in on %s" % \
807                        (eid, tb))
808                timedout = False
809                try:
810                    if not self.ssh_cmd(user, host,
811                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
812                            "swapexp", timeout=10*60):
813                        return False
814                except self.ssh_cmd_timeout:
815                    timedout = True
816               
817                # If the command was terminated, but completed successfully,
818                # report success.
819                if timedout:
[d5e3b8e]820                    self.log.debug("[start_segment]: swapin timed out " +\
821                            "checking state")
[c570f7e]822                    state = self.get_state(user, host, tb, pid, eid)
[d5e3b8e]823                    self.log.debug("[start_segment]: state is %s" % state)
[4b362df]824                    return state == 'active'
825            # Everything has gone OK.
826            return True
827
828    class stop_segment(emulab_segment):
829        def __init__(self, log=None, keyfile=None, debug=False):
830            experiment_control_local.emulab_segment.__init__(self,
831                    log=log, keyfile=keyfile, debug=debug)
832
833        def __call__(self, tb, eid, tbparams):
834            """
835            Stop a sub experiment by calling swapexp on the federant
836            """
837            user = tbparams[tb]['user']
838            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
839            pid = tbparams[tb]['project']
840
841            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
[3c20a31]842            rv = False
843            try:
844                # Clean out tar files: we've gone over quota in the past
845                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
846                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
847                        (pid, eid))
848                rv = self.ssh_cmd(user, host,
849                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
850            except self.ssh_cmd_timeout:
851                rv = False
852            return rv
[866c983]853
854       
[6679c122]855    def generate_ssh_keys(self, dest, type="rsa" ):
[866c983]856        """
857        Generate a set of keys for the gateways to use to talk.
858
859        Keys are of type type and are stored in the required dest file.
860        """
861        valid_types = ("rsa", "dsa")
862        t = type.lower();
863        if t not in valid_types: raise ValueError
864        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
865
866        try:
867            trace = open("/dev/null", "w")
868        except IOError:
869            raise service_error(service_error.internal,
870                    "Cannot open /dev/null??");
871
872        # May raise CalledProcessError
873        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
[4ea1e22]874        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
[866c983]875        if rv != 0:
876            raise service_error(service_error.internal, 
877                    "Cannot generate nonce ssh keys.  %s return code %d" \
878                            % (self.ssh_keygen, rv))
[6679c122]879
[0d830de]880    def gentopo(self, str):
[866c983]881        """
882        Generate the topology dtat structure from the splitter's XML
883        representation of it.
884
885        The topology XML looks like:
886            <experiment>
887                <nodes>
888                    <node><vname></vname><ips>ip1:ip2</ips></node>
889                </nodes>
890                <lans>
891                    <lan>
892                        <vname></vname><vnode></vnode><ip></ip>
893                        <bandwidth></bandwidth><member>node:port</member>
894                    </lan>
895                </lans>
896        """
897        class topo_parse:
898            """
899            Parse the topology XML and create the dats structure.
900            """
901            def __init__(self):
902                # Typing of the subelements for data conversion
903                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
904                self.int_subelements = ( 'bandwidth',)
905                self.float_subelements = ( 'delay',)
906                # The final data structure
907                self.nodes = [ ]
908                self.lans =  [ ]
909                self.topo = { \
910                        'node': self.nodes,\
911                        'lan' : self.lans,\
912                    }
913                self.element = { }  # Current element being created
914                self.chars = ""     # Last text seen
915
916            def end_element(self, name):
917                # After each sub element the contents is added to the current
918                # element or to the appropriate list.
919                if name == 'node':
920                    self.nodes.append(self.element)
921                    self.element = { }
922                elif name == 'lan':
923                    self.lans.append(self.element)
924                    self.element = { }
925                elif name in self.str_subelements:
926                    self.element[name] = self.chars
927                    self.chars = ""
928                elif name in self.int_subelements:
929                    self.element[name] = int(self.chars)
930                    self.chars = ""
931                elif name in self.float_subelements:
932                    self.element[name] = float(self.chars)
933                    self.chars = ""
934
935            def found_chars(self, data):
936                self.chars += data.rstrip()
937
938
939        tp = topo_parse();
940        parser = xml.parsers.expat.ParserCreate()
941        parser.EndElementHandler = tp.end_element
942        parser.CharacterDataHandler = tp.found_chars
943
944        parser.Parse(str)
945
946        return tp.topo
947       
[0d830de]948
949    def genviz(self, topo):
[866c983]950        """
951        Generate the visualization the virtual topology
952        """
953
954        neato = "/usr/local/bin/neato"
955        # These are used to parse neato output and to create the visualization
956        # file.
957        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
958        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
959                "%s</type></node>"
960
961        try:
962            # Node names
963            nodes = [ n['vname'] for n in topo['node'] ]
964            topo_lans = topo['lan']
965        except KeyError:
966            raise service_error(service_error.internal, "Bad topology")
967
968        lans = { }
969        links = { }
970
971        # Walk through the virtual topology, organizing the connections into
972        # 2-node connections (links) and more-than-2-node connections (lans).
973        # When a lan is created, it's added to the list of nodes (there's a
974        # node in the visualization for the lan).
975        for l in topo_lans:
976            if links.has_key(l['vname']):
977                if len(links[l['vname']]) < 2:
978                    links[l['vname']].append(l['vnode'])
979                else:
980                    nodes.append(l['vname'])
981                    lans[l['vname']] = links[l['vname']]
982                    del links[l['vname']]
983                    lans[l['vname']].append(l['vnode'])
984            elif lans.has_key(l['vname']):
985                lans[l['vname']].append(l['vnode'])
986            else:
987                links[l['vname']] = [ l['vnode'] ]
988
989
990        # Open up a temporary file for dot to turn into a visualization
991        try:
992            df, dotname = tempfile.mkstemp()
993            dotfile = os.fdopen(df, 'w')
994        except IOError:
995            raise service_error(service_error.internal,
996                    "Failed to open file in genviz")
997
[886307f]998        try:
999            dnull = open('/dev/null', 'w')
1000        except IOError:
1001            service_error(service_error.internal,
1002                    "Failed to open /dev/null in genviz")
1003
[866c983]1004        # Generate a dot/neato input file from the links, nodes and lans
1005        try:
1006            print >>dotfile, "graph G {"
1007            for n in nodes:
1008                print >>dotfile, '\t"%s"' % n
1009            for l in links.keys():
1010                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
1011            for l in lans.keys():
1012                for n in lans[l]:
1013                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
1014            print >>dotfile, "}"
1015            dotfile.close()
1016        except TypeError:
1017            raise service_error(service_error.internal,
1018                    "Single endpoint link in vtopo")
1019        except IOError:
1020            raise service_error(service_error.internal, "Cannot write dot file")
1021
1022        # Use dot to create a visualization
1023        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
[886307f]1024                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
1025                close_fds=True)
1026        dnull.close()
[866c983]1027
1028        # Translate dot to vis format
1029        vis_nodes = [ ]
1030        vis = { 'node': vis_nodes }
1031        for line in dot.stdout:
1032            m = vis_re.match(line)
1033            if m:
1034                vn = m.group(1)
1035                vis_node = {'name': vn, \
1036                        'x': float(m.group(2)),\
1037                        'y' : float(m.group(3)),\
1038                    }
1039                if vn in links.keys() or vn in lans.keys():
1040                    vis_node['type'] = 'lan'
1041                else:
1042                    vis_node['type'] = 'node'
1043                vis_nodes.append(vis_node)
1044        rv = dot.wait()
1045
1046        os.remove(dotname)
1047        if rv == 0 : return vis
1048        else: return None
[d0ae12d]1049
[4064742]1050    def get_access(self, tb, nodes, user, tbparam, master, export_project,
[866c983]1051            access_user):
1052        """
1053        Get access to testbed through fedd and set the parameters for that tb
1054        """
1055        uri = self.tbmap.get(tb, None)
1056        if not uri:
1057            raise service_error(serice_error.server_config, 
1058                    "Unknown testbed: %s" % tb)
1059
1060        # currently this lumps all users into one service access group
1061        service_keys = [ a for u in user \
1062                for a in u.get('access', []) \
1063                    if a.has_key('sshPubkey')]
1064
1065        if len(service_keys) == 0:
1066            raise service_error(service_error.req, 
1067                    "Must have at least one SSH pubkey for services")
1068
1069
1070        for p, u in access_user:
1071            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
1072                    "to %s") %  ((p or "None"), u, uri))
1073
1074            if p:
1075                # Request with user and project specified
1076                req = {\
1077                        'destinationTestbed' : { 'uri' : uri },
1078                        'project': { 
1079                            'name': {'localname': p},
1080                            'user': [ {'userID': { 'localname': u } } ],
1081                            },
1082                        'user':  user,
1083                        'allocID' : { 'localname': 'test' },
1084                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1085                        'serviceAccess' : service_keys
1086                    }
1087            else:
1088                # Request with only user specified
1089                req = {\
1090                        'destinationTestbed' : { 'uri' : uri },
1091                        'user':  [ {'userID': { 'localname': u } } ],
1092                        'allocID' : { 'localname': 'test' },
1093                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
1094                        'serviceAccess' : service_keys
1095                    }
1096
1097            if tb == master:
1098                # NB, the export_project parameter is a dict that includes
1099                # the type
1100                req['exportProject'] = export_project
1101
1102            # node resources if any
1103            if nodes != None and len(nodes) > 0:
1104                rnodes = [ ]
1105                for n in nodes:
1106                    rn = { }
1107                    image, hw, count = n.split(":")
1108                    if image: rn['image'] = [ image ]
1109                    if hw: rn['hardware'] = [ hw ]
1110                    if count and int(count) >0 : rn['count'] = int(count)
1111                    rnodes.append(rn)
1112                req['resources']= { }
1113                req['resources']['node'] = rnodes
1114
1115            try:
1116                if self.local_access.has_key(uri):
1117                    # Local access call
1118                    req = { 'RequestAccessRequestBody' : req }
1119                    r = self.local_access[uri].RequestAccess(req, 
1120                            fedid(file=self.cert_file))
1121                    r = { 'RequestAccessResponseBody' : r }
1122                else:
1123                    r = self.call_RequestAccess(uri, req, 
1124                            self.cert_file, self.cert_pwd, self.trusted_certs)
1125            except service_error, e:
1126                if e.code == service_error.access:
1127                    self.log.debug("[get_access] Access denied")
1128                    r = None
1129                    continue
1130                else:
1131                    raise e
1132
1133            if r.has_key('RequestAccessResponseBody'):
1134                # Through to here we have a valid response, not a fault.
1135                # Access denied is a fault, so something better or worse than
1136                # access denied has happened.
1137                r = r['RequestAccessResponseBody']
1138                self.log.debug("[get_access] Access granted")
1139                break
1140            else:
1141                raise service_error(service_error.protocol,
1142                        "Bad proxy response")
1143       
1144        if not r:
1145            raise service_error(service_error.access, 
1146                    "Access denied by %s (%s)" % (tb, uri))
1147
1148        e = r['emulab']
1149        p = e['project']
1150        tbparam[tb] = { 
1151                "boss": e['boss'],
1152                "host": e['ops'],
1153                "domain": e['domain'],
1154                "fs": e['fileServer'],
1155                "eventserver": e['eventServer'],
1156                "project": unpack_id(p['name']),
1157                "emulab" : e,
1158                "allocID" : r['allocID'],
1159                }
1160        # Make the testbed name be the label the user applied
1161        p['testbed'] = {'localname': tb }
1162
1163        for u in p['user']:
1164            role = u.get('role', None)
1165            if role == 'experimentCreation':
1166                tbparam[tb]['user'] = unpack_id(u['userID'])
1167                break
1168        else:
1169            raise service_error(service_error.internal, 
1170                    "No createExperimentUser from %s" %tb)
1171
1172        # Add attributes to barameter space.  We don't allow attributes to
1173        # overlay any parameters already installed.
1174        for a in e['fedAttr']:
1175            try:
1176                if a['attribute'] and isinstance(a['attribute'], basestring)\
1177                        and not tbparam[tb].has_key(a['attribute'].lower()):
1178                    tbparam[tb][a['attribute'].lower()] = a['value']
1179            except KeyError:
1180                self.log.error("Bad attribute in response: %s" % a)
1181       
[d81971a]1182    def release_access(self, tb, aid):
[866c983]1183        """
1184        Release access to testbed through fedd
1185        """
1186
1187        uri = self.tbmap.get(tb, None)
1188        if not uri:
1189            raise service_error(serice_error.server_config, 
1190                    "Unknown testbed: %s" % tb)
1191
1192        if self.local_access.has_key(uri):
1193            resp = self.local_access[uri].ReleaseAccess(\
1194                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1195                    fedid(file=self.cert_file))
1196            resp = { 'ReleaseAccessResponseBody': resp } 
1197        else:
1198            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1199                    self.cert_file, self.cert_pwd, self.trusted_certs)
1200
1201        # better error coding
[d81971a]1202
[f4f4117]1203    def remote_splitter(self, uri, desc, master):
1204
[866c983]1205        req = {
1206                'description' : { 'ns2description': desc },
1207                'master': master,
1208                'include_fedkit': bool(self.fedkit),
1209                'include_gatewaykit': bool(self.gatewaykit)
1210            }
1211
1212        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1213                self.trusted_certs)
1214
1215        if r.has_key('Ns2SplitResponseBody'):
1216            r = r['Ns2SplitResponseBody']
1217            if r.has_key('output'):
1218                return r['output'].splitlines()
1219            else:
1220                raise service_error(service_error.protocol, 
1221                        "Bad splitter response (no output)")
1222        else:
1223            raise service_error(service_error.protocol, "Bad splitter response")
1224       
[6679c122]1225    class current_testbed:
[866c983]1226        """
1227        Object for collecting the current testbed description.  The testbed
1228        description is saved to a file with the local testbed variables
1229        subsittuted line by line.
1230        """
1231        def __init__(self, eid, tmpdir, fedkit, gatewaykit):
1232            def tar_list_to_string(tl):
1233                if tl is None: return None
1234
1235                rv = ""
1236                for t in tl:
1237                    rv += " %s PROJDIR/tarfiles/EID/%s" % \
1238                            (t[0], os.path.basename(t[1]))
1239                return rv
1240
1241
1242            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1243            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1244            self.current_testbed = None
1245            self.testbed_file = None
1246
1247            self.def_expstart = \
1248                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1249            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1250            self.def_gwstart = \
1251                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1252            self.def_mgwstart = \
1253                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1254            self.def_gwimage = "FBSD61-TUNNEL2";
1255            self.def_gwtype = "pc";
1256            self.def_mgwcmd = '# '
1257            self.def_mgwcmdparams = ''
1258            self.def_gwcmd = '# '
1259            self.def_gwcmdparams = ''
1260
1261            self.eid = eid
1262            self.tmpdir = tmpdir
1263            # Convert fedkit and gateway kit (which are lists of tuples) into a
1264            # substituition string.
1265            self.fedkit = tar_list_to_string(fedkit)
1266            self.gatewaykit = tar_list_to_string(gatewaykit)
1267
1268        def __call__(self, line, master, allocated, tbparams):
1269            # Capture testbed topology descriptions
1270            if self.current_testbed == None:
1271                m = self.begin_testbed.match(line)
1272                if m != None:
1273                    self.current_testbed = m.group(1)
1274                    if self.current_testbed == None:
1275                        raise service_error(service_error.req,
1276                                "Bad request format (unnamed testbed)")
1277                    allocated[self.current_testbed] = \
1278                            allocated.get(self.current_testbed,0) + 1
1279                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1280                    if not os.path.exists(tb_dir):
1281                        try:
1282                            os.mkdir(tb_dir)
1283                        except IOError:
1284                            raise service_error(service_error.internal,
1285                                    "Cannot create %s" % tb_dir)
1286                    try:
1287                        self.testbed_file = open("%s/%s.%s.tcl" %
1288                                (tb_dir, self.eid, self.current_testbed), 'w')
1289                    except IOError:
1290                        self.testbed_file = None
1291                    return True
1292                else: return False
1293            else:
1294                m = self.end_testbed.match(line)
1295                if m != None:
1296                    if m.group(1) != self.current_testbed:
1297                        raise service_error(service_error.internal, 
1298                                "Mismatched testbed markers!?")
1299                    if self.testbed_file != None: 
1300                        self.testbed_file.close()
1301                        self.testbed_file = None
1302                    self.current_testbed = None
1303                elif self.testbed_file:
1304                    # Substitute variables and put the line into the local
1305                    # testbed file.
1306                    gwtype = tbparams[self.current_testbed].get(\
1307                            'connectortype', self.def_gwtype)
1308                    gwimage = tbparams[self.current_testbed].get(\
1309                            'connectorimage', self.def_gwimage)
1310                    mgwstart = tbparams[self.current_testbed].get(\
1311                            'masterconnectorstartcmd', self.def_mgwstart)
1312                    mexpstart = tbparams[self.current_testbed].get(\
1313                            'masternodestartcmd', self.def_mexpstart)
1314                    gwstart = tbparams[self.current_testbed].get(\
1315                            'slaveconnectorstartcmd', self.def_gwstart)
1316                    expstart = tbparams[self.current_testbed].get(\
1317                            'slavenodestartcmd', self.def_expstart)
1318                    project = tbparams[self.current_testbed].get('project')
1319                    gwcmd = tbparams[self.current_testbed].get(\
1320                            'slaveconnectorcmd', self.def_gwcmd)
1321                    gwcmdparams = tbparams[self.current_testbed].get(\
1322                            'slaveconnectorcmdparams', self.def_gwcmdparams)
1323                    mgwcmd = tbparams[self.current_testbed].get(\
1324                            'masterconnectorcmd', self.def_gwcmd)
1325                    mgwcmdparams = tbparams[self.current_testbed].get(\
1326                            'masterconnectorcmdparams', self.def_gwcmdparams)
1327                    line = re.sub("GWTYPE", gwtype, line)
1328                    line = re.sub("GWIMAGE", gwimage, line)
1329                    if self.current_testbed == master:
1330                        line = re.sub("GWSTART", mgwstart, line)
1331                        line = re.sub("EXPSTART", mexpstart, line)
1332                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1333                        line = re.sub("GWCMDPARAMS", mgwcmdparams, line)
1334                        line = re.sub("(#\s*)?GWCMD", mgwcmd, line)
1335                    else:
1336                        line = re.sub("GWSTART", gwstart, line)
1337                        line = re.sub("EXPSTART", expstart, line)
1338                        # NB GWCMDPARAMS is a prefix of GWCMD, so expand first
1339                        line = re.sub("GWCMDPARAMS", gwcmdparams, line)
1340                        line = re.sub("(#\s*)?GWCMD", gwcmd, line)
1341                    #These expansions contain EID and PROJDIR.  NB these are
1342                    # local fedkit and gatewaykit, which are strings.
1343                    if self.fedkit:
1344                        line = re.sub("FEDKIT", self.fedkit, line)
1345                    if self.gatewaykit:
1346                        line = re.sub("GATEWAYKIT", self.gatewaykit, line)
1347                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1348                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1349                    line = re.sub("EID", self.eid, line)
1350                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1351                            (project, self.eid), line)
1352                    print >>self.testbed_file, line
1353                return True
[6679c122]1354
1355    class allbeds:
[866c983]1356        """
1357        Process the Allbeds section.  Get access to each federant and save the
1358        parameters in tbparams
1359        """
1360        def __init__(self, get_access):
1361            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1362            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1363            self.in_allbeds = False
1364            self.get_access = get_access
1365
1366        def __call__(self, line, user, tbparams, master, export_project,
1367                access_user):
1368            # Testbed access parameters
1369            if not self.in_allbeds:
1370                if self.begin_allbeds.match(line):
1371                    self.in_allbeds = True
1372                    return True
1373                else:
1374                    return False
1375            else:
1376                if self.end_allbeds.match(line):
1377                    self.in_allbeds = False
1378                else:
1379                    nodes = line.split('|')
1380                    tb = nodes.pop(0)
1381                    self.get_access(tb, nodes, user, tbparams, master,
1382                            export_project, access_user)
1383                return True
[6679c122]1384
1385    class gateways:
[866c983]1386        def __init__(self, eid, master, tmpdir, gw_pubkey,
1387                gw_secretkey, copy_file, fedkit):
1388            self.begin_gateways = \
1389                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1390            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1391            self.current_gateways = None
1392            self.control_gateway = None
1393            self.active_end = { }
1394
1395            self.eid = eid
1396            self.master = master
1397            self.tmpdir = tmpdir
1398            self.gw_pubkey_base = gw_pubkey
1399            self.gw_secretkey_base = gw_secretkey
1400
1401            self.copy_file = copy_file
1402            self.fedkit = fedkit
1403
1404
1405        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1406                active_end, tbparams, dtb, myname, desthost, type):
1407            """
1408            Produce a gateway configuration file from a gateways line.
1409            """
1410
1411            sproject = tbparams[gw].get('project', 'project')
1412            dproject = tbparams[dtb].get('project', 'project')
1413            sdomain = ".%s.%s%s" % (eid, sproject,
1414                    tbparams[gw].get('domain', ".example.com"))
1415            ddomain = ".%s.%s%s" % (eid, dproject,
1416                    tbparams[dtb].get('domain', ".example.com"))
1417            boss = tbparams[master].get('boss', "boss")
1418            fs = tbparams[master].get('fs', "fs")
1419            event_server = "%s%s" % \
1420                    (tbparams[gw].get('eventserver', "event_server"),
1421                            tbparams[gw].get('domain', "example.com"))
1422            remote_event_server = "%s%s" % \
1423                    (tbparams[dtb].get('eventserver', "event_server"),
1424                            tbparams[dtb].get('domain', "example.com"))
1425            seer_control = "%s%s" % \
1426                    (tbparams[gw].get('control', "control"), sdomain)
1427            tunnel_iface = tbparams[gw].get("tunnelinterface", None)
1428
1429            if self.fedkit:
1430                remote_script_dir = "/usr/local/federation/bin"
1431                local_script_dir = "/usr/local/federation/bin"
1432            else:
1433                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1434                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1435
1436            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1437            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1438            tunnel_cfg = tbparams[gw].get("tunnelcfg", "false")
1439
1440            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1441            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1442
1443            # translate to lower case so the `hostname` hack for specifying
1444            # configuration files works.
1445            conf_file = conf_file.lower();
1446            remote_conf_file = remote_conf_file.lower();
1447
1448            if dtb == master:
1449                active = "false"
1450            elif gw == master:
1451                active = "true"
1452            elif active_end.has_key('%s-%s' % (dtb, gw)):
1453                active = "false"
1454            else:
1455                active_end['%s-%s' % (gw, dtb)] = 1
1456                active = "true"
1457
1458            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1459            print >>gwconfig, "Active: %s" % active
1460            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1461            if tunnel_iface:
1462                print >>gwconfig, "Interface: %s" % tunnel_iface
1463            print >>gwconfig, "BossName: %s" % boss
1464            print >>gwconfig, "FsName: %s" % fs
1465            print >>gwconfig, "EventServerName: %s" % event_server
1466            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1467            print >>gwconfig, "SeerControl: %s" % seer_control
1468            print >>gwconfig, "Type: %s" % type
1469            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1470            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1471                    local_script_dir
1472            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1473            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1474            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1475                    (remote_conf_dir, remote_conf_file)
1476            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1477            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1478            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1479            gwconfig.close()
1480
1481            return active == "true"
1482
1483        def __call__(self, line, allocated, tbparams):
1484            # Process gateways
1485            if not self.current_gateways:
1486                m = self.begin_gateways.match(line)
1487                if m:
1488                    self.current_gateways = m.group(1)
1489                    if allocated.has_key(self.current_gateways):
1490                        # This test should always succeed
1491                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1492                        if not os.path.exists(tb_dir):
1493                            try:
1494                                os.mkdir(tb_dir)
1495                            except IOError:
1496                                raise service_error(service_error.internal,
1497                                        "Cannot create %s" % tb_dir)
1498                    else:
1499                        # XXX
1500                        self.log.error("[gateways]: Ignoring gateways for " + \
1501                                "unknown testbed %s" % self.current_gateways)
1502                        self.current_gateways = None
1503                    return True
1504                else:
1505                    return False
1506            else:
1507                m = self.end_gateways.match(line)
1508                if m :
1509                    if m.group(1) != self.current_gateways:
1510                        raise service_error(service_error.internal,
1511                                "Mismatched gateway markers!?")
1512                    if self.control_gateway:
1513                        try:
1514                            cc = open("%s/%s/client.conf" %
1515                                    (self.tmpdir, self.current_gateways), 'w')
1516                            print >>cc, "ControlGateway: %s" % \
1517                                    self.control_gateway
1518                            if tbparams[self.master].has_key('smbshare'):
1519                                print >>cc, "SMBSHare: %s" % \
1520                                        tbparams[self.master]['smbshare']
1521                            print >>cc, "ProjectUser: %s" % \
1522                                    tbparams[self.master]['user']
1523                            print >>cc, "ProjectName: %s" % \
1524                                    tbparams[self.master]['project']
1525                            print >>cc, "ExperimentID: %s/%s" % \
1526                                    ( tbparams[self.master]['project'], \
1527                                    self.eid )
1528                            cc.close()
1529                        except IOError:
1530                            raise service_error(service_error.internal,
1531                                    "Error creating client config")
1532                        # XXX: This seer specific file should disappear
1533                        try:
1534                            cc = open("%s/%s/seer.conf" %
1535                                    (self.tmpdir, self.current_gateways),
1536                                    'w')
1537                            if self.current_gateways != self.master:
1538                                print >>cc, "ControlNode: %s" % \
1539                                        self.control_gateway
1540                            print >>cc, "ExperimentID: %s/%s" % \
1541                                    ( tbparams[self.master]['project'], \
1542                                    self.eid )
1543                            cc.close()
1544                        except IOError:
1545                            raise service_error(service_error.internal,
1546                                    "Error creating seer config")
1547                    else:
1548                        debug.error("[gateways]: No control gateway for %s" %\
1549                                    self.current_gateways)
1550                    self.current_gateways = None
1551                else:
1552                    dtb, myname, desthost, type = line.split(" ")
1553
1554                    if type == "control" or type == "both":
1555                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1556                                self.eid, 
1557                                tbparams[self.current_gateways]['project'],
1558                                tbparams[self.current_gateways]['domain'])
1559                    try:
1560                        active = self.gateway_conf_file(self.current_gateways,
1561                                self.master, self.eid, self.gw_pubkey_base,
1562                                self.gw_secretkey_base,
1563                                self.active_end, tbparams, dtb, myname,
1564                                desthost, type)
1565                    except IOError, e:
1566                        raise service_error(service_error.internal,
1567                                "Failed to write config file for %s" % \
1568                                        self.current_gateway)
1569           
1570                    gw_pubkey = "%s/keys/%s" % \
1571                            (self.tmpdir, self.gw_pubkey_base)
1572                    gw_secretkey = "%s/keys/%s" % \
1573                            (self.tmpdir, self.gw_secretkey_base)
1574
1575                    pkfile = "%s/%s/%s" % \
1576                            ( self.tmpdir, self.current_gateways, 
1577                                    self.gw_pubkey_base)
1578                    skfile = "%s/%s/%s" % \
1579                            ( self.tmpdir, self.current_gateways, 
1580                                    self.gw_secretkey_base)
1581
1582                    if not os.path.exists(pkfile):
1583                        try:
1584                            self.copy_file(gw_pubkey, pkfile)
1585                        except IOError:
1586                            service_error(service_error.internal,
1587                                    "Failed to copy pubkey file")
1588
1589                    if active and not os.path.exists(skfile):
1590                        try:
1591                            self.copy_file(gw_secretkey, skfile)
1592                        except IOError:
1593                            service_error(service_error.internal,
1594                                    "Failed to copy secretkey file")
1595                return True
[6679c122]1596
1597    class shunt_to_file:
[866c983]1598        """
1599        Simple class to write data between two regexps to a file.
1600        """
1601        def __init__(self, begin, end, filename):
1602            """
1603            Begin shunting on a match of begin, stop on end, send data to
1604            filename.
1605            """
1606            self.begin = re.compile(begin)
1607            self.end = re.compile(end)
1608            self.in_shunt = False
1609            self.file = None
1610            self.filename = filename
1611
1612        def __call__(self, line):
1613            """
1614            Call this on each line in the input that may be shunted.
1615            """
1616            if not self.in_shunt:
1617                if self.begin.match(line):
1618                    self.in_shunt = True
1619                    try:
1620                        self.file = open(self.filename, "w")
1621                    except:
1622                        self.file = None
1623                        raise
1624                    return True
1625                else:
1626                    return False
1627            else:
1628                if self.end.match(line):
1629                    if self.file: 
1630                        self.file.close()
1631                        self.file = None
1632                    self.in_shunt = False
1633                else:
1634                    if self.file:
1635                        print >>self.file, line
1636                return True
[6679c122]1637
1638    class shunt_to_list:
[866c983]1639        """
1640        Same interface as shunt_to_file.  Data collected in self.list, one list
1641        element per line.
1642        """
1643        def __init__(self, begin, end):
1644            self.begin = re.compile(begin)
1645            self.end = re.compile(end)
1646            self.in_shunt = False
1647            self.list = [ ]
1648       
1649        def __call__(self, line):
1650            if not self.in_shunt:
1651                if self.begin.match(line):
1652                    self.in_shunt = True
1653                    return True
1654                else:
1655                    return False
1656            else:
1657                if self.end.match(line):
1658                    self.in_shunt = False
1659                else:
1660                    self.list.append(line)
1661                return True
[6679c122]1662
[0d830de]1663    class shunt_to_string:
[866c983]1664        """
1665        Same interface as shunt_to_file.  Data collected in self.str, all in
1666        one string.
1667        """
1668        def __init__(self, begin, end):
1669            self.begin = re.compile(begin)
1670            self.end = re.compile(end)
1671            self.in_shunt = False
1672            self.str = ""
1673       
1674        def __call__(self, line):
1675            if not self.in_shunt:
1676                if self.begin.match(line):
1677                    self.in_shunt = True
1678                    return True
1679                else:
1680                    return False
1681            else:
1682                if self.end.match(line):
1683                    self.in_shunt = False
1684                else:
1685                    self.str += line
1686                return True
[0d830de]1687
[bd3e314]1688    def allocate_resources(self, allocated, master, eid, expid, expcert, 
1689            tbparams, tmpdir, alloc_log=None):
1690        started = { }           # Testbeds where a sub-experiment started
1691                                # successfully
1692
1693        # XXX
1694        fail_soft = False
1695
1696        log = alloc_log or self.log
1697
1698        thread_pool = self.thread_pool(self.nthreads)
1699        threads = [ ]
1700
1701        for tb in [ k for k in allocated.keys() if k != master]:
1702            # Create and start a thread to start the segment, and save it to
1703            # get the return value later
1704            thread_pool.wait_for_slot()
1705            t  = self.pooled_thread(\
1706                    target=self.start_segment(log=log,
1707                        keyfile=self.ssh_privkey_file, debug=self.debug), 
1708                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1709                    pdata=thread_pool, trace_file=self.trace_file)
1710            threads.append(t)
1711            t.start()
1712
1713        # Wait until all finish
1714        thread_pool.wait_for_all_done()
1715
1716        # If none failed, start the master
1717        failed = [ t.getName() for t in threads if not t.rv ]
1718
1719        if len(failed) == 0:
1720            starter = self.start_segment(log=log, 
1721                    keyfile=self.ssh_privkey_file, debug=self.debug)
1722            if not starter(master, eid, tbparams, tmpdir):
1723                failed.append(master)
1724
1725        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1726        # If one failed clean up, unless fail_soft is set
1727        if failed:
1728            if not fail_soft:
1729                thread_pool.clear()
1730                for tb in succeeded:
1731                    # Create and start a thread to stop the segment
1732                    thread_pool.wait_for_slot()
1733                    t  = self.pooled_thread(\
1734                            target=self.stop_segment(log=log,
1735                                keyfile=self.ssh_privkey_file,
1736                                debug=self.debug), 
1737                            args=(tb, eid, tbparams), name=tb,
1738                            pdata=thread_pool, trace_file=self.trace_file)
1739                    t.start()
1740                # Wait until all finish
1741                thread_pool.wait_for_all_done()
1742
1743                # release the allocations
1744                for tb in tbparams.keys():
1745                    self.release_access(tb, tbparams[tb]['allocID'])
1746                # Remove the placeholder
1747                self.state_lock.acquire()
1748                self.state[eid]['experimentStatus'] = 'failed'
1749                if self.state_filename: self.write_state()
1750                self.state_lock.release()
1751
1752                #raise service_error(service_error.federant,
1753                #    "Swap in failed on %s" % ",".join(failed))
1754                log.error("Swap in failed on %s" % ",".join(failed))
1755                return
1756        else:
1757            log.info("[start_segment]: Experiment %s active" % eid)
1758
1759        log.debug("[start_experiment]: removing %s" % tmpdir)
1760
1761        # Walk up tmpdir, deleting as we go
1762        for path, dirs, files in os.walk(tmpdir, topdown=False):
1763            for f in files:
1764                os.remove(os.path.join(path, f))
1765            for d in dirs:
1766                os.rmdir(os.path.join(path, d))
1767        os.rmdir(tmpdir)
1768
1769        # Insert the experiment into our state and update the disk copy
1770        self.state_lock.acquire()
1771        self.state[expid]['experimentStatus'] = 'active'
1772        self.state[eid] = self.state[expid]
1773        if self.state_filename: self.write_state()
1774        self.state_lock.release()
1775        return
1776
[6679c122]1777    def create_experiment(self, req, fid):
[866c983]1778        """
1779        The external interface to experiment creation called from the
1780        dispatcher.
1781
1782        Creates a working directory, splits the incoming description using the
1783        splitter script and parses out the avrious subsections using the
1784        lcasses above.  Once each sub-experiment is created, use pooled threads
1785        to instantiate them and start it all up.
1786        """
1787
1788        if not self.auth.check_attribute(fid, 'create'):
1789            raise service_error(service_error.access, "Create access denied")
1790
1791        try:
1792            tmpdir = tempfile.mkdtemp(prefix="split-")
1793        except IOError:
1794            raise service_error(service_error.internal, "Cannot create tmp dir")
1795
1796        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1797        gw_secretkey_base = "fed.%s" % self.ssh_type
1798        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1799        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1800        tclfile = tmpdir + "/experiment.tcl"
1801        tbparams = { }
1802        try:
1803            access_user = self.accessdb[fid]
1804        except KeyError:
1805            raise service_error(service_error.internal,
1806                    "Access map and authorizer out of sync in " + \
1807                            "create_experiment for fedid %s"  % fid)
1808
1809        pid = "dummy"
1810        gid = "dummy"
1811        try:
1812            os.mkdir(tmpdir+"/keys")
1813        except OSError:
1814            raise service_error(service_error.internal,
1815                    "Can't make temporary dir")
1816
1817        req = req.get('CreateRequestBody', None)
1818        if not req:
1819            raise service_error(service_error.req,
1820                    "Bad request format (no CreateRequestBody)")
1821        # The tcl parser needs to read a file so put the content into that file
1822        descr=req.get('experimentdescription', None)
1823        if descr:
1824            file_content=descr.get('ns2description', None)
1825            if file_content:
1826                try:
1827                    f = open(tclfile, 'w')
1828                    f.write(file_content)
1829                    f.close()
1830                except IOError:
1831                    raise service_error(service_error.internal,
1832                            "Cannot write temp experiment description")
1833            else:
1834                raise service_error(service_error.req, 
1835                        "Only ns2descriptions supported")
1836        else:
1837            raise service_error(service_error.req, "No experiment description")
1838
[bd3e314]1839        # Generate an ID for the experiment (slice) and a certificate that the
1840        # allocator can use to prove they own it.  We'll ship it back through
1841        # the encrypted connection.
1842        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1843
[866c983]1844        if req.has_key('experimentID') and \
1845                req['experimentID'].has_key('localname'):
[9479343]1846            overwrite = False
[866c983]1847            eid = req['experimentID']['localname']
[9479343]1848            # If there's an old failed experiment here with the same local name
1849            # and accessible by this user, we'll overwrite it, otherwise we'll
1850            # fall through and do the collision avoidance.
1851            old_expid = self.get_experiment_fedid(eid)
1852            if old_expid and self.check_experiment_access(fid, old_expid):
1853                self.state_lock.acquire()
1854                status = self.state[eid].get('experimentStatus', None)
1855                if status and status == 'failed':
1856                    # remove the old access attribute
1857                    self.auth.unset_attribute(fid, old_expid)
1858                    overwrite = True
1859                    del self.state[eid]
1860                    del self.state[old_expid]
1861                self.state_lock.release()
[866c983]1862            self.state_lock.acquire()
[9479343]1863            while (self.state.has_key(eid) and not overwrite):
[866c983]1864                eid += random.choice(string.ascii_letters)
[bd3e314]1865            # Initial state
1866            self.state[eid] = {
1867                    'experimentID' : \
1868                            [ { 'localname' : eid }, {'fedid': expid } ],
1869                    'experimentStatus': 'starting',
1870                    'experimentAccess': { 'X509' : expcert },
1871                    'owner': fid,
1872                    'log' : [],
1873                }
1874            self.state[expid] = self.state[eid]
1875            if self.state_filename: self.write_state()
[866c983]1876            self.state_lock.release()
1877        else:
1878            eid = self.exp_stem
1879            for i in range(0,5):
1880                eid += random.choice(string.ascii_letters)
1881            self.state_lock.acquire()
1882            while (self.state.has_key(eid)):
1883                eid = self.exp_stem
1884                for i in range(0,5):
1885                    eid += random.choice(string.ascii_letters)
[bd3e314]1886            # Initial state
1887            self.state[eid] = {
1888                    'experimentID' : \
1889                            [ { 'localname' : eid }, {'fedid': expid } ],
1890                    'experimentStatus': 'starting',
1891                    'experimentAccess': { 'X509' : expcert },
1892                    'owner': fid,
1893                    'log' : [],
1894                }
1895            self.state[expid] = self.state[eid]
1896            if self.state_filename: self.write_state()
[866c983]1897            self.state_lock.release()
1898
1899        try: 
1900            # This catches exceptions to clear the placeholder if necessary
1901            try:
1902                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1903            except ValueError:
1904                raise service_error(service_error.server_config, 
1905                        "Bad key type (%s)" % self.ssh_type)
1906
1907            user = req.get('user', None)
1908            if user == None:
1909                raise service_error(service_error.req, "No user")
1910
1911            master = req.get('master', None)
1912            if not master:
1913                raise service_error(service_error.req,
1914                        "No master testbed label")
1915            export_project = req.get('exportProject', None)
1916            if not export_project:
1917                raise service_error(service_error.req, "No export project")
1918           
1919            if self.splitter_url:
1920                self.log.debug("Calling remote splitter at %s" % \
1921                        self.splitter_url)
1922                split_data = self.remote_splitter(self.splitter_url,
1923                        file_content, master)
1924            else:
1925                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1926                    str(self.muxmax), '-m', master]
1927
1928                if self.fedkit:
1929                    tclcmd.append('-k')
1930
1931                if self.gatewaykit:
1932                    tclcmd.append('-K')
1933
1934                tclcmd.extend([pid, gid, eid, tclfile])
1935
1936                self.log.debug("running local splitter %s", " ".join(tclcmd))
[70caa72]1937                # This is just fantastic.  As a side effect the parser copies
1938                # tb_compat.tcl into the current directory, so that directory
1939                # must be writable by the fedd user.  Doing this in the
1940                # temporary subdir ensures this is the case.
1941                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1942                        cwd=tmpdir)
[866c983]1943                split_data = tclparser.stdout
1944
1945            allocated = { }         # Testbeds we can access
1946            # Objects to parse the splitter output (defined above)
1947            parse_current_testbed = self.current_testbed(eid, tmpdir,
1948                    self.fedkit, self.gatewaykit)
1949            parse_allbeds = self.allbeds(self.get_access)
1950            parse_gateways = self.gateways(eid, master, tmpdir,
1951                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1952                    self.fedkit)
1953            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1954                        "^#\s+End\s+Vtopo")
1955            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1956                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1957            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1958                    "^#\s+End\s+tarfiles")
1959            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1960                    "^#\s+End\s+rpms")
1961
1962            # Working on the split data
1963            for line in split_data:
1964                line = line.rstrip()
1965                if parse_current_testbed(line, master, allocated, tbparams):
1966                    continue
1967                elif parse_allbeds(line, user, tbparams, master, export_project,
1968                        access_user):
1969                    continue
1970                elif parse_gateways(line, allocated, tbparams):
1971                    continue
1972                elif parse_vtopo(line):
1973                    continue
1974                elif parse_hostnames(line):
1975                    continue
1976                elif parse_tarfiles(line):
1977                    continue
1978                elif parse_rpms(line):
1979                    continue
1980                else:
1981                    raise service_error(service_error.internal, 
1982                            "Bad tcl parse? %s" % line)
1983            # Virtual topology and visualization
1984            vtopo = self.gentopo(parse_vtopo.str)
1985            if not vtopo:
1986                raise service_error(service_error.internal, 
1987                        "Failed to generate virtual topology")
1988
1989            vis = self.genviz(vtopo)
1990            if not vis:
1991                raise service_error(service_error.internal, 
1992                        "Failed to generate visualization")
[bd3e314]1993
[866c983]1994           
1995            # save federant information
1996            for k in allocated.keys():
1997                tbparams[k]['federant'] = {\
1998                        'name': [ { 'localname' : eid} ],\
1999                        'emulab': tbparams[k]['emulab'],\
2000                        'allocID' : tbparams[k]['allocID'],\
2001                        'master' : k == master,\
2002                    }
2003
[bd3e314]2004            self.state_lock.acquire()
2005            self.state[eid]['vtopo'] = vtopo
2006            self.state[eid]['vis'] = vis
2007            self.state[expid]['federant'] = \
2008                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2009                        if tbparams[tb].has_key('federant') ]
2010            if self.state_filename: self.write_state()
2011            self.state_lock.release()
[866c983]2012
2013            # Copy tarfiles and rpms needed at remote sites into a staging area
2014            try:
2015                if self.fedkit:
2016                    for t in self.fedkit:
2017                        parse_tarfiles.list.append(t[1])
2018                if self.gatewaykit:
2019                    for t in self.gatewaykit:
2020                        parse_tarfiles.list.append(t[1])
2021                for t in parse_tarfiles.list:
2022                    if not os.path.exists("%s/tarfiles" % tmpdir):
2023                        os.mkdir("%s/tarfiles" % tmpdir)
2024                    self.copy_file(t, "%s/tarfiles/%s" % \
2025                            (tmpdir, os.path.basename(t)))
2026                for r in parse_rpms.list:
2027                    if not os.path.exists("%s/rpms" % tmpdir):
2028                        os.mkdir("%s/rpms" % tmpdir)
2029                    self.copy_file(r, "%s/rpms/%s" % \
2030                            (tmpdir, os.path.basename(r)))
2031                # A null experiment file in case we need to create a remote
2032                # experiment from scratch
2033                f = open("%s/null.tcl" % tmpdir, "w")
2034                print >>f, """
[a0c12a6]2035set ns [new Simulator]
2036source tb_compat.tcl
2037
2038set a [$ns node]
2039
2040$ns rtproto Session
2041$ns run
2042"""
[866c983]2043                f.close()
2044
2045            except IOError, e:
2046                raise service_error(service_error.internal, 
2047                        "Cannot stage tarfile/rpm: %s" % e.strerror)
2048
2049        except service_error, e:
2050            # If something goes wrong in the parse (usually an access error)
2051            # clear the placeholder state.  From here on out the code delays
[bd3e314]2052            # exceptions.  Failing at this point returns a fault to the remote
2053            # caller.
[866c983]2054            self.state_lock.acquire()
2055            del self.state[eid]
[bd3e314]2056            del self.state[expid]
2057            if self.state_filename: self.write_state()
[866c983]2058            self.state_lock.release()
2059            raise e
2060
2061
[bd3e314]2062        # Start the background swapper and return the starting state.  From
2063        # here on out, the state will stick around a while.
[866c983]2064
[bd3e314]2065        # Let users touch the state
2066        self.auth.set_attribute(fid, expid)
2067        self.auth.set_attribute(expid, expid)
[ca489e8]2068        # Override fedids can manipulate state as well
2069        for o in self.overrides:
2070            self.auth.set_attribute(o, expid)
[866c983]2071
[bd3e314]2072        # Create a logger that logs to the experiment's state object as well as
2073        # to the main log file.
2074        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2075        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
2076        # XXX: there should be a global one of these rather than repeating the
2077        # code.
2078        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2079                    '%d %b %y %H:%M:%S'))
2080        alloc_log.addHandler(h)
2081       
2082        # Start a thread to do the resource allocation
2083        t  = Thread(target=self.allocate_resources,
2084                args=(allocated, master, eid, expid, expcert, tbparams, 
2085                    tmpdir, alloc_log),
2086                name=eid)
2087        t.start()
[866c983]2088
[bd3e314]2089        rv = {
2090                'experimentID': [
2091                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2092                ],
[a74ea78]2093                'experimentStatus': 'starting',
[bd3e314]2094                'experimentAccess': { 'X509' : expcert }
2095            }
[866c983]2096
[bd3e314]2097        return rv
[9479343]2098   
2099    def get_experiment_fedid(self, key):
2100        """
2101        find the fedid associated with the localname key in the state database.
2102        """
2103
2104        rv = None
2105        self.state_lock.acquire()
2106        if self.state.has_key(key):
2107            if isinstance(self.state[key], dict):
2108                try:
2109                    kl = [ f['fedid'] for f in \
2110                            self.state[key]['experimentID']\
2111                                if f.has_key('fedid') ]
2112                except KeyError:
2113                    self.state_lock.release()
2114                    raise service_error(service_error.internal, 
2115                            "No fedid for experiment %s when getting "+\
2116                                    "fedid(!?)" % key)
2117                if len(kl) == 1:
2118                    rv = kl[0]
2119                else:
2120                    self.state_lock.release()
2121                    raise service_error(service_error.internal, 
2122                            "multiple fedids for experiment %s when " +\
2123                                    "getting fedid(!?)" % key)
2124            else:
2125                self.state_lock.release()
2126                raise service_error(service_error.internal, 
2127                        "Unexpected state for %s" % key)
2128        self.state_lock.release()
2129        return rv
[a97394b]2130
[4064742]2131    def check_experiment_access(self, fid, key):
[866c983]2132        """
2133        Confirm that the fid has access to the experiment.  Though a request
2134        may be made in terms of a local name, the access attribute is always
2135        the experiment's fedid.
2136        """
2137        if not isinstance(key, fedid):
[9479343]2138            key = self.get_experiment_fedid(key)
[866c983]2139
2140        if self.auth.check_attribute(fid, key):
2141            return True
2142        else:
2143            raise service_error(service_error.access, "Access Denied")
[4064742]2144
2145
[987aaa1]2146
2147    def get_vtopo(self, req, fid):
[866c983]2148        """
2149        Return the stored virtual topology for this experiment
2150        """
2151        rv = None
[bd3e314]2152        state = None
[866c983]2153
2154        req = req.get('VtopoRequestBody', None)
2155        if not req:
2156            raise service_error(service_error.req,
2157                    "Bad request format (no VtopoRequestBody)")
2158        exp = req.get('experiment', None)
2159        if exp:
2160            if exp.has_key('fedid'):
2161                key = exp['fedid']
2162                keytype = "fedid"
2163            elif exp.has_key('localname'):
2164                key = exp['localname']
2165                keytype = "localname"
2166            else:
2167                raise service_error(service_error.req, "Unknown lookup type")
2168        else:
2169            raise service_error(service_error.req, "No request?")
2170
2171        self.check_experiment_access(fid, key)
2172
2173        self.state_lock.acquire()
2174        if self.state.has_key(key):
[bd3e314]2175            if self.state[key].has_key('vtopo'):
2176                rv = { 'experiment' : {keytype: key },\
2177                        'vtopo': self.state[key]['vtopo'],\
2178                    }
2179            else:
2180                state = self.state[key]['experimentStatus']
[866c983]2181        self.state_lock.release()
2182
2183        if rv: return rv
[bd3e314]2184        else: 
2185            if state:
2186                raise service_error(service_error.partial, 
2187                        "Not ready: %s" % state)
2188            else:
2189                raise service_error(service_error.req, "No such experiment")
[987aaa1]2190
2191    def get_vis(self, req, fid):
[866c983]2192        """
2193        Return the stored visualization for this experiment
2194        """
2195        rv = None
[bd3e314]2196        state = None
[866c983]2197
2198        req = req.get('VisRequestBody', None)
2199        if not req:
2200            raise service_error(service_error.req,
2201                    "Bad request format (no VisRequestBody)")
2202        exp = req.get('experiment', None)
2203        if exp:
2204            if exp.has_key('fedid'):
2205                key = exp['fedid']
2206                keytype = "fedid"
2207            elif exp.has_key('localname'):
2208                key = exp['localname']
2209                keytype = "localname"
2210            else:
2211                raise service_error(service_error.req, "Unknown lookup type")
2212        else:
2213            raise service_error(service_error.req, "No request?")
2214
2215        self.check_experiment_access(fid, key)
2216
2217        self.state_lock.acquire()
2218        if self.state.has_key(key):
[bd3e314]2219            if self.state[key].has_key('vis'):
2220                rv =  { 'experiment' : {keytype: key },\
2221                        'vis': self.state[key]['vis'],\
2222                        }
2223            else:
2224                state = self.state[key]['experimentStatus']
[866c983]2225        self.state_lock.release()
2226
2227        if rv: return rv
[bd3e314]2228        else:
2229            if state:
2230                raise service_error(service_error.partial, 
2231                        "Not ready: %s" % state)
2232            else:
2233                raise service_error(service_error.req, "No such experiment")
[987aaa1]2234
[65f3f29]2235    def clean_info_response(self, rv):
2236        """
2237        Remove the information in the experiment's state object that is not in
2238        the info response.
2239        """
2240        # Remove the owner info (should always be there, but...)
2241        if rv.has_key('owner'): del rv['owner']
2242
2243        # Convert the log into the allocationLog parameter and remove the
2244        # log entry (with defensive programming)
2245        if rv.has_key('log'):
2246            rv['allocationLog'] = "".join(rv['log'])
2247            del rv['log']
2248        else:
2249            rv['allocationLog'] = ""
2250
2251        if rv['experimentStatus'] != 'active':
2252            if rv.has_key('federant'): del rv['federant']
2253        else:
2254            # remove the allocationID info from each federant
2255            for f in rv.get('federant', []):
2256                if f.has_key('allocID'): del f['allocID']
2257        return rv
2258
[c52c48d]2259    def get_info(self, req, fid):
[866c983]2260        """
2261        Return all the stored info about this experiment
2262        """
2263        rv = None
2264
2265        req = req.get('InfoRequestBody', None)
2266        if not req:
2267            raise service_error(service_error.req,
[65f3f29]2268                    "Bad request format (no InfoRequestBody)")
[866c983]2269        exp = req.get('experiment', None)
2270        if exp:
2271            if exp.has_key('fedid'):
2272                key = exp['fedid']
2273                keytype = "fedid"
2274            elif exp.has_key('localname'):
2275                key = exp['localname']
2276                keytype = "localname"
2277            else:
2278                raise service_error(service_error.req, "Unknown lookup type")
2279        else:
2280            raise service_error(service_error.req, "No request?")
2281
2282        self.check_experiment_access(fid, key)
2283
2284        # The state may be massaged by the service function that called
2285        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2286        # state.
2287        self.state_lock.acquire()
2288        if self.state.has_key(key):
2289            rv = copy.deepcopy(self.state[key])
2290        self.state_lock.release()
2291
[bd3e314]2292        if rv:
[65f3f29]2293            return self.clean_info_response(rv)
[bd3e314]2294        else:
2295            raise service_error(service_error.req, "No such experiment")
[7a8d667]2296
[65f3f29]2297    def get_multi_info(self, req, fid):
2298        """
2299        Return all the stored info that this fedid can access
2300        """
2301        rv = { 'info': [ ] }
2302
2303        self.state_lock.acquire()
2304        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2305            self.check_experiment_access(fid, key)
2306
2307            if self.state.has_key(key):
2308                e = copy.deepcopy(self.state[key])
2309                e = self.clean_info_response(e)
2310                rv['info'].append(e)
2311        self.state_lock.release()
2312        return rv
2313
[7a8d667]2314
2315    def terminate_experiment(self, req, fid):
[866c983]2316        """
2317        Swap this experiment out on the federants and delete the shared
2318        information
2319        """
2320        tbparams = { }
2321        req = req.get('TerminateRequestBody', None)
2322        if not req:
2323            raise service_error(service_error.req,
2324                    "Bad request format (no TerminateRequestBody)")
[ca489e8]2325        force = req.get('force', False)
[866c983]2326        exp = req.get('experiment', None)
2327        if exp:
2328            if exp.has_key('fedid'):
2329                key = exp['fedid']
2330                keytype = "fedid"
2331            elif exp.has_key('localname'):
2332                key = exp['localname']
2333                keytype = "localname"
2334            else:
2335                raise service_error(service_error.req, "Unknown lookup type")
2336        else:
2337            raise service_error(service_error.req, "No request?")
2338
2339        self.check_experiment_access(fid, key)
2340
[46e4682]2341        dealloc_list = [ ]
2342
2343
2344        # Create a logger that logs to the dealloc_list as well as to the main
2345        # log file.
2346        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2347        h = logging.StreamHandler(self.list_log(dealloc_list))
2348        # XXX: there should be a global one of these rather than repeating the
2349        # code.
2350        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2351                    '%d %b %y %H:%M:%S'))
2352        dealloc_log.addHandler(h)
2353
[866c983]2354        self.state_lock.acquire()
2355        fed_exp = self.state.get(key, None)
2356
2357        if fed_exp:
2358            # This branch of the conditional holds the lock to generate a
2359            # consistent temporary tbparams variable to deallocate experiments.
2360            # It releases the lock to do the deallocations and reacquires it to
2361            # remove the experiment state when the termination is complete.
[bd3e314]2362
2363            # First make sure that the experiment creation is complete.
[a74ea78]2364            status = fed_exp.get('experimentStatus', None)
[46e4682]2365
[a74ea78]2366            if status:
[46e4682]2367                if status in ('starting', 'terminating'):
[ca489e8]2368                    if not force:
2369                        self.state_lock.release()
2370                        raise service_error(service_error.partial, 
[46e4682]2371                                'Experiment still being created or destroyed')
[ca489e8]2372                    else:
[46e4682]2373                        self.log.warning('Experiment in %s state ' % status + \
2374                                'being terminated by force.')
[bd3e314]2375            else:
2376                # No status??? trouble
2377                self.state_lock.release()
2378                raise service_error(service_error.internal,
2379                        "Experiment has no status!?")
2380
[866c983]2381            ids = []
2382            #  experimentID is a list of dicts that are self-describing
2383            #  identifiers.  This finds all the fedids and localnames - the
2384            #  keys of self.state - and puts them into ids.
2385            for id in fed_exp.get('experimentID', []):
2386                if id.has_key('fedid'): ids.append(id['fedid'])
2387                if id.has_key('localname'): ids.append(id['localname'])
2388
2389            # Construct enough of the tbparams to make the stop_segment calls
2390            # work
[bd3e314]2391            for fed in fed_exp.get('federant', []):
[866c983]2392                try:
2393                    for e in fed['name']:
2394                        eid = e.get('localname', None)
2395                        if eid: break
2396                    else:
2397                        continue
2398
2399                    p = fed['emulab']['project']
2400
2401                    project = p['name']['localname']
2402                    tb = p['testbed']['localname']
2403                    user = p['user'][0]['userID']['localname']
2404
2405                    domain = fed['emulab']['domain']
2406                    host  = fed['emulab']['ops']
2407                    aid = fed['allocID']
2408                except KeyError, e:
2409                    continue
2410                tbparams[tb] = {\
2411                        'user': user,\
2412                        'domain': domain,\
2413                        'project': project,\
2414                        'host': host,\
2415                        'eid': eid,\
2416                        'aid': aid,\
2417                    }
[46e4682]2418            fed_exp['experimentStatus'] = 'terminating'
2419            if self.state_filename: self.write_state()
[866c983]2420            self.state_lock.release()
2421
[46e4682]2422            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2423            # then completes, so we can't wait if nothing starts.  So, no
2424            # tbparams, no start.
2425            if len(tbparams) > 0:
2426                thread_pool = self.thread_pool(self.nthreads)
2427                for tb in tbparams.keys():
2428                    # Create and start a thread to stop the segment
2429                    thread_pool.wait_for_slot()
2430                    t  = self.pooled_thread(\
2431                            target=self.stop_segment(log=dealloc_log,
2432                                keyfile=self.ssh_privkey_file, debug=self.debug), 
2433                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
2434                            pdata=thread_pool, trace_file=self.trace_file)
2435                    t.start()
2436                # Wait for completions
2437                thread_pool.wait_for_all_done()
[866c983]2438
[a74ea78]2439            # release the allocations (failed experiments have done this
[ca489e8]2440            # already, and starting experiments may be in odd states, so we
2441            # ignore errors releasing those allocations
2442            try: 
[a74ea78]2443                for tb in tbparams.keys():
2444                    self.release_access(tb, tbparams[tb]['aid'])
[ca489e8]2445            except service_error, e:
2446                if status != 'failed' and not force:
2447                    raise e
[866c983]2448
2449            # Remove the terminated experiment
2450            self.state_lock.acquire()
2451            for id in ids:
2452                if self.state.has_key(id): del self.state[id]
2453
2454            if self.state_filename: self.write_state()
2455            self.state_lock.release()
2456
[46e4682]2457            return { 
2458                    'experiment': exp , 
2459                    'deallocationLog': "".join(dealloc_list),
2460                    }
[866c983]2461        else:
2462            # Don't forget to release the lock
2463            self.state_lock.release()
2464            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.