source: fedd/federation/experiment_control.py @ f07fa49

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since f07fa49 was f07fa49, checked in by Ted Faber <faber@…>, 15 years ago

better logging and cleanup

  • Property mode set to 100644
File size: 73.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self):
123            """
124            Wait until all active threads finish (and at least one has started)
125            """
126            self.acquire()
127            while self.started == 0 or self.started > self.terminated:
128                self.wait()
129            self.release()
130
131    class pooled_thread(Thread):
132        """
133        One of a set of threads dedicated to a specific task.  Uses the
134        thread_pool class above for coordination.
135        """
136        def __init__(self, group=None, target=None, name=None, args=(), 
137                kwargs={}, pdata=None, trace_file=None):
138            Thread.__init__(self, group, target, name, args, kwargs)
139            self.rv = None          # Return value of the ops in this thread
140            self.exception = None   # Exception that terminated this thread
141            self.target=target      # Target function to run on start()
142            self.args = args        # Args to pass to target
143            self.kwargs = kwargs    # Additional kw args
144            self.pdata = pdata      # thread_pool for this class
145            # Logger for this thread
146            self.log = logging.getLogger("fedd.experiment_control")
147       
148        def run(self):
149            """
150            Emulate Thread.run, except add pool data manipulation and error
151            logging.
152            """
153            if self.pdata:
154                self.pdata.start()
155
156            if self.target:
157                try:
158                    self.rv = self.target(*self.args, **self.kwargs)
159                except service_error, s:
160                    self.exception = s
161                    self.log.error("Thread exception: %s %s" % \
162                            (s.code_string(), s.desc))
163                except:
164                    self.exception = sys.exc_info()[1]
165                    self.log.error(("Unexpected thread exception: %s" +\
166                            "Trace %s") % (self.exception,\
167                                traceback.format_exc()))
168            if self.pdata:
169                self.pdata.terminate()
170
171    call_RequestAccess = service_caller('RequestAccess')
172    call_ReleaseAccess = service_caller('ReleaseAccess')
173    call_StartSegment = service_caller('StartSegment')
174    call_TerminateSegment = service_caller('TerminateSegment')
175    call_Ns2Split = service_caller('Ns2Split')
176
177    def __init__(self, config=None, auth=None):
178        """
179        Intialize the various attributes, most from the config object
180        """
181
182        def parse_tarfile_list(tf):
183            """
184            Parse a tarfile list from the configuration.  This is a set of
185            paths and tarfiles separated by spaces.
186            """
187            rv = [ ]
188            if tf is not None:
189                tl = tf.split()
190                while len(tl) > 1:
191                    p, t = tl[0:2]
192                    del tl[0:2]
193                    rv.append((p, t))
194            return rv
195
196        self.thread_with_rv = experiment_control_local.pooled_thread
197        self.thread_pool = experiment_control_local.thread_pool
198        self.list_log = list_log.list_log
199
200        self.cert_file = config.get("experiment_control", "cert_file")
201        if self.cert_file:
202            self.cert_pwd = config.get("experiment_control", "cert_pwd")
203        else:
204            self.cert_file = config.get("globals", "cert_file")
205            self.cert_pwd = config.get("globals", "cert_pwd")
206
207        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
208                or config.get("globals", "trusted_certs")
209
210        self.repodir = config.get("experiment_control", "repodir")
211
212        self.exp_stem = "fed-stem"
213        self.log = logging.getLogger("fedd.experiment_control")
214        set_log_level(config, "experiment_control", self.log)
215        self.muxmax = 2
216        self.nthreads = 2
217        self.randomize_experiments = False
218
219        self.splitter = None
220        self.ssh_keygen = "/usr/bin/ssh-keygen"
221        self.ssh_identity_file = None
222
223
224        self.debug = config.getboolean("experiment_control", "create_debug")
225        self.state_filename = config.get("experiment_control", 
226                "experiment_state")
227        self.splitter_url = config.get("experiment_control", "splitter_uri")
228        self.fedkit = parse_tarfile_list(\
229                config.get("experiment_control", "fedkit"))
230        self.gatewaykit = parse_tarfile_list(\
231                config.get("experiment_control", "gatewaykit"))
232        accessdb_file = config.get("experiment_control", "accessdb")
233
234        self.ssh_pubkey_file = config.get("experiment_control", 
235                "ssh_pubkey_file")
236        self.ssh_privkey_file = config.get("experiment_control",
237                "ssh_privkey_file")
238        # NB for internal master/slave ops, not experiment setup
239        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
240
241        self.overrides = set([])
242        ovr = config.get('experiment_control', 'overrides')
243        if ovr:
244            for o in ovr.split(","):
245                o = o.strip()
246                if o.startswith('fedid:'): o = o[len('fedid:'):]
247                self.overrides.add(fedid(hexstr=o))
248
249        self.state = { }
250        self.state_lock = Lock()
251        self.tclsh = "/usr/local/bin/otclsh"
252        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
253                config.get("experiment_control", "tcl_splitter",
254                        "/usr/testbed/lib/ns2ir/parse.tcl")
255        mapdb_file = config.get("experiment_control", "mapdb")
256        self.trace_file = sys.stderr
257
258        self.def_expstart = \
259                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
260                "/tmp/federate";
261        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
262                "FEDDIR/hosts";
263        self.def_gwstart = \
264                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
265                "/tmp/bridge.log";
266        self.def_mgwstart = \
267                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
268                "/tmp/bridge.log";
269        self.def_gwimage = "FBSD61-TUNNEL2";
270        self.def_gwtype = "pc";
271        self.local_access = { }
272
273        if auth:
274            self.auth = auth
275        else:
276            self.log.error(\
277                    "[access]: No authorizer initialized, creating local one.")
278            auth = authorizer()
279
280
281        if self.ssh_pubkey_file:
282            try:
283                f = open(self.ssh_pubkey_file, 'r')
284                self.ssh_pubkey = f.read()
285                f.close()
286            except IOError:
287                raise service_error(service_error.internal,
288                        "Cannot read sshpubkey")
289        else:
290            raise service_error(service_error.internal, 
291                    "No SSH public key file?")
292
293        if not self.ssh_privkey_file:
294            raise service_error(service_error.internal, 
295                    "No SSH public key file?")
296
297
298        if mapdb_file:
299            self.read_mapdb(mapdb_file)
300        else:
301            self.log.warn("[experiment_control] No testbed map, using defaults")
302            self.tbmap = { 
303                    'deter':'https://users.isi.deterlab.net:23235',
304                    'emulab':'https://users.isi.deterlab.net:23236',
305                    'ucb':'https://users.isi.deterlab.net:23237',
306                    }
307
308        if accessdb_file:
309                self.read_accessdb(accessdb_file)
310        else:
311            raise service_error(service_error.internal,
312                    "No accessdb specified in config")
313
314        # Grab saved state.  OK to do this w/o locking because it's read only
315        # and only one thread should be in existence that can see self.state at
316        # this point.
317        if self.state_filename:
318            self.read_state()
319
320        # Dispatch tables
321        self.soap_services = {\
322                'Create': soap_handler('Create', self.create_experiment),
323                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
324                'Vis': soap_handler('Vis', self.get_vis),
325                'Info': soap_handler('Info', self.get_info),
326                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
327                'Terminate': soap_handler('Terminate', 
328                    self.terminate_experiment),
329        }
330
331        self.xmlrpc_services = {\
332                'Create': xmlrpc_handler('Create', self.create_experiment),
333                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
334                'Vis': xmlrpc_handler('Vis', self.get_vis),
335                'Info': xmlrpc_handler('Info', self.get_info),
336                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
337                'Terminate': xmlrpc_handler('Terminate',
338                    self.terminate_experiment),
339        }
340
341    def copy_file(self, src, dest, size=1024):
342        """
343        Exceedingly simple file copy.
344        """
345        s = open(src,'r')
346        d = open(dest, 'w')
347
348        buf = "x"
349        while buf != "":
350            buf = s.read(size)
351            d.write(buf)
352        s.close()
353        d.close()
354
355    # Call while holding self.state_lock
356    def write_state(self):
357        """
358        Write a new copy of experiment state after copying the existing state
359        to a backup.
360
361        State format is a simple pickling of the state dictionary.
362        """
363        if os.access(self.state_filename, os.W_OK):
364            self.copy_file(self.state_filename, \
365                    "%s.bak" % self.state_filename)
366        try:
367            f = open(self.state_filename, 'w')
368            pickle.dump(self.state, f)
369        except IOError, e:
370            self.log.error("Can't write file %s: %s" % \
371                    (self.state_filename, e))
372        except pickle.PicklingError, e:
373            self.log.error("Pickling problem: %s" % e)
374        except TypeError, e:
375            self.log.error("Pickling problem (TypeError): %s" % e)
376
377    # Call while holding self.state_lock
378    def read_state(self):
379        """
380        Read a new copy of experiment state.  Old state is overwritten.
381
382        State format is a simple pickling of the state dictionary.
383        """
384       
385        def get_experiment_id(state):
386            """
387            Pull the fedid experimentID out of the saved state.  This is kind
388            of a gross walk through the dict.
389            """
390
391            if state.has_key('experimentID'):
392                for e in state['experimentID']:
393                    if e.has_key('fedid'):
394                        return e['fedid']
395                else:
396                    return None
397            else:
398                return None
399
400        def get_alloc_ids(state):
401            """
402            Pull the fedids of the identifiers of each allocation from the
403            state.  Again, a dict dive that's best isolated.
404            """
405
406            return [ f['allocID']['fedid'] 
407                    for f in state.get('federant',[]) \
408                        if f.has_key('allocID') and \
409                            f['allocID'].has_key('fedid')]
410
411
412        try:
413            f = open(self.state_filename, "r")
414            self.state = pickle.load(f)
415            self.log.debug("[read_state]: Read state from %s" % \
416                    self.state_filename)
417        except IOError, e:
418            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
419                    % (self.state_filename, e))
420        except pickle.UnpicklingError, e:
421            self.log.warning(("[read_state]: No saved state: " + \
422                    "Unpickling failed: %s") % e)
423       
424        for s in self.state.values():
425            try:
426
427                eid = get_experiment_id(s)
428                if eid : 
429                    # Give the owner rights to the experiment
430                    self.auth.set_attribute(s['owner'], eid)
431                    # And holders of the eid as well
432                    self.auth.set_attribute(eid, eid)
433                    # allow overrides to control experiments as well
434                    for o in self.overrides:
435                        self.auth.set_attribute(o, eid)
436                    # Set permissions to allow reading of the software repo, if
437                    # any, as well.
438                    for a in get_alloc_ids(s):
439                        self.auth.set_attribute(a, 'repo/%s' % eid)
440                else:
441                    raise KeyError("No experiment id")
442            except KeyError, e:
443                self.log.warning("[read_state]: State ownership or identity " +\
444                        "misformatted in %s: %s" % (self.state_filename, e))
445
446
447    def read_accessdb(self, accessdb_file):
448        """
449        Read the mapping from fedids that can create experiments to their name
450        in the 3-level access namespace.  All will be asserted from this
451        testbed and can include the local username and porject that will be
452        asserted on their behalf by this fedd.  Each fedid is also added to the
453        authorization system with the "create" attribute.
454        """
455        self.accessdb = {}
456        # These are the regexps for parsing the db
457        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
458        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
459                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
460        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
461                "\s*->\s*(" + name_expr + ")\s*$")
462        lineno = 0
463
464        # Parse the mappings and store in self.authdb, a dict of
465        # fedid -> (proj, user)
466        try:
467            f = open(accessdb_file, "r")
468            for line in f:
469                lineno += 1
470                line = line.strip()
471                if len(line) == 0 or line.startswith('#'):
472                    continue
473                m = project_line.match(line)
474                if m:
475                    fid = fedid(hexstr=m.group(1))
476                    project, user = m.group(2,3)
477                    if not self.accessdb.has_key(fid):
478                        self.accessdb[fid] = []
479                    self.accessdb[fid].append((project, user))
480                    continue
481
482                m = user_line.match(line)
483                if m:
484                    fid = fedid(hexstr=m.group(1))
485                    project = None
486                    user = m.group(2)
487                    if not self.accessdb.has_key(fid):
488                        self.accessdb[fid] = []
489                    self.accessdb[fid].append((project, user))
490                    continue
491                self.log.warn("[experiment_control] Error parsing access " +\
492                        "db %s at line %d" %  (accessdb_file, lineno))
493        except IOError:
494            raise service_error(service_error.internal,
495                    "Error opening/reading %s as experiment " +\
496                            "control accessdb" %  accessdb_file)
497        f.close()
498
499        # Initialize the authorization attributes
500        for fid in self.accessdb.keys():
501            self.auth.set_attribute(fid, 'create')
502
503    def read_mapdb(self, file):
504        """
505        Read a simple colon separated list of mappings for the
506        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
507        """
508
509        self.tbmap = { }
510        lineno =0
511        try:
512            f = open(file, "r")
513            for line in f:
514                lineno += 1
515                line = line.strip()
516                if line.startswith('#') or len(line) == 0:
517                    continue
518                try:
519                    label, url = line.split(':', 1)
520                    self.tbmap[label] = url
521                except ValueError, e:
522                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
523                            "map db: %s %s" % (lineno, line, e))
524        except IOError, e:
525            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
526                    "open %s: %s" % (file, e))
527        f.close()
528       
529    def generate_ssh_keys(self, dest, type="rsa" ):
530        """
531        Generate a set of keys for the gateways to use to talk.
532
533        Keys are of type type and are stored in the required dest file.
534        """
535        valid_types = ("rsa", "dsa")
536        t = type.lower();
537        if t not in valid_types: raise ValueError
538        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
539
540        try:
541            trace = open("/dev/null", "w")
542        except IOError:
543            raise service_error(service_error.internal,
544                    "Cannot open /dev/null??");
545
546        # May raise CalledProcessError
547        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
548        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
549        if rv != 0:
550            raise service_error(service_error.internal, 
551                    "Cannot generate nonce ssh keys.  %s return code %d" \
552                            % (self.ssh_keygen, rv))
553
554    def gentopo(self, str):
555        """
556        Generate the topology dtat structure from the splitter's XML
557        representation of it.
558
559        The topology XML looks like:
560            <experiment>
561                <nodes>
562                    <node><vname></vname><ips>ip1:ip2</ips></node>
563                </nodes>
564                <lans>
565                    <lan>
566                        <vname></vname><vnode></vnode><ip></ip>
567                        <bandwidth></bandwidth><member>node:port</member>
568                    </lan>
569                </lans>
570        """
571        class topo_parse:
572            """
573            Parse the topology XML and create the dats structure.
574            """
575            def __init__(self):
576                # Typing of the subelements for data conversion
577                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
578                self.int_subelements = ( 'bandwidth',)
579                self.float_subelements = ( 'delay',)
580                # The final data structure
581                self.nodes = [ ]
582                self.lans =  [ ]
583                self.topo = { \
584                        'node': self.nodes,\
585                        'lan' : self.lans,\
586                    }
587                self.element = { }  # Current element being created
588                self.chars = ""     # Last text seen
589
590            def end_element(self, name):
591                # After each sub element the contents is added to the current
592                # element or to the appropriate list.
593                if name == 'node':
594                    self.nodes.append(self.element)
595                    self.element = { }
596                elif name == 'lan':
597                    self.lans.append(self.element)
598                    self.element = { }
599                elif name in self.str_subelements:
600                    self.element[name] = self.chars
601                    self.chars = ""
602                elif name in self.int_subelements:
603                    self.element[name] = int(self.chars)
604                    self.chars = ""
605                elif name in self.float_subelements:
606                    self.element[name] = float(self.chars)
607                    self.chars = ""
608
609            def found_chars(self, data):
610                self.chars += data.rstrip()
611
612
613        tp = topo_parse();
614        parser = xml.parsers.expat.ParserCreate()
615        parser.EndElementHandler = tp.end_element
616        parser.CharacterDataHandler = tp.found_chars
617
618        parser.Parse(str)
619
620        return tp.topo
621       
622
623    def genviz(self, topo):
624        """
625        Generate the visualization the virtual topology
626        """
627
628        neato = "/usr/local/bin/neato"
629        # These are used to parse neato output and to create the visualization
630        # file.
631        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
632        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
633                "%s</type></node>"
634
635        try:
636            # Node names
637            nodes = [ n['vname'] for n in topo['node'] ]
638            topo_lans = topo['lan']
639        except KeyError, e:
640            raise service_error(service_error.internal, "Bad topology: %s" %e)
641
642        lans = { }
643        links = { }
644
645        # Walk through the virtual topology, organizing the connections into
646        # 2-node connections (links) and more-than-2-node connections (lans).
647        # When a lan is created, it's added to the list of nodes (there's a
648        # node in the visualization for the lan).
649        for l in topo_lans:
650            if links.has_key(l['vname']):
651                if len(links[l['vname']]) < 2:
652                    links[l['vname']].append(l['vnode'])
653                else:
654                    nodes.append(l['vname'])
655                    lans[l['vname']] = links[l['vname']]
656                    del links[l['vname']]
657                    lans[l['vname']].append(l['vnode'])
658            elif lans.has_key(l['vname']):
659                lans[l['vname']].append(l['vnode'])
660            else:
661                links[l['vname']] = [ l['vnode'] ]
662
663
664        # Open up a temporary file for dot to turn into a visualization
665        try:
666            df, dotname = tempfile.mkstemp()
667            dotfile = os.fdopen(df, 'w')
668        except IOError:
669            raise service_error(service_error.internal,
670                    "Failed to open file in genviz")
671
672        try:
673            dnull = open('/dev/null', 'w')
674        except IOError:
675            service_error(service_error.internal,
676                    "Failed to open /dev/null in genviz")
677
678        # Generate a dot/neato input file from the links, nodes and lans
679        try:
680            print >>dotfile, "graph G {"
681            for n in nodes:
682                print >>dotfile, '\t"%s"' % n
683            for l in links.keys():
684                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
685            for l in lans.keys():
686                for n in lans[l]:
687                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
688            print >>dotfile, "}"
689            dotfile.close()
690        except TypeError:
691            raise service_error(service_error.internal,
692                    "Single endpoint link in vtopo")
693        except IOError:
694            raise service_error(service_error.internal, "Cannot write dot file")
695
696        # Use dot to create a visualization
697        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
698                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
699                close_fds=True)
700        dnull.close()
701
702        # Translate dot to vis format
703        vis_nodes = [ ]
704        vis = { 'node': vis_nodes }
705        for line in dot.stdout:
706            m = vis_re.match(line)
707            if m:
708                vn = m.group(1)
709                vis_node = {'name': vn, \
710                        'x': float(m.group(2)),\
711                        'y' : float(m.group(3)),\
712                    }
713                if vn in links.keys() or vn in lans.keys():
714                    vis_node['type'] = 'lan'
715                else:
716                    vis_node['type'] = 'node'
717                vis_nodes.append(vis_node)
718        rv = dot.wait()
719
720        os.remove(dotname)
721        if rv == 0 : return vis
722        else: return None
723
724    def get_access(self, tb, nodes, user, tbparam, master, export_project,
725            access_user):
726        """
727        Get access to testbed through fedd and set the parameters for that tb
728        """
729        uri = self.tbmap.get(tb, None)
730        if not uri:
731            raise service_error(serice_error.server_config, 
732                    "Unknown testbed: %s" % tb)
733
734        # currently this lumps all users into one service access group
735        service_keys = [ a for u in user \
736                for a in u.get('access', []) \
737                    if a.has_key('sshPubkey')]
738
739        if len(service_keys) == 0:
740            raise service_error(service_error.req, 
741                    "Must have at least one SSH pubkey for services")
742
743
744        for p, u in access_user:
745            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
746                    "to %s") %  ((p or "None"), u, uri))
747
748            if p:
749                # Request with user and project specified
750                req = {\
751                        'destinationTestbed' : { 'uri' : uri },
752                        'project': { 
753                            'name': {'localname': p},
754                            'user': [ {'userID': { 'localname': u } } ],
755                            },
756                        'user':  user,
757                        'allocID' : { 'localname': 'test' },
758                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
759                        'serviceAccess' : service_keys
760                    }
761            else:
762                # Request with only user specified
763                req = {\
764                        'destinationTestbed' : { 'uri' : uri },
765                        'user':  [ {'userID': { 'localname': u } } ],
766                        'allocID' : { 'localname': 'test' },
767                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
768                        'serviceAccess' : service_keys
769                    }
770
771            if tb == master:
772                # NB, the export_project parameter is a dict that includes
773                # the type
774                req['exportProject'] = export_project
775
776            # node resources if any
777            if nodes != None and len(nodes) > 0:
778                rnodes = [ ]
779                for n in nodes:
780                    rn = { }
781                    image, hw, count = n.split(":")
782                    if image: rn['image'] = [ image ]
783                    if hw: rn['hardware'] = [ hw ]
784                    if count and int(count) >0 : rn['count'] = int(count)
785                    rnodes.append(rn)
786                req['resources']= { }
787                req['resources']['node'] = rnodes
788
789            try:
790                if self.local_access.has_key(uri):
791                    # Local access call
792                    req = { 'RequestAccessRequestBody' : req }
793                    r = self.local_access[uri].RequestAccess(req, 
794                            fedid(file=self.cert_file))
795                    r = { 'RequestAccessResponseBody' : r }
796                else:
797                    r = self.call_RequestAccess(uri, req, 
798                            self.cert_file, self.cert_pwd, self.trusted_certs)
799            except service_error, e:
800                if e.code == service_error.access:
801                    self.log.debug("[get_access] Access denied")
802                    r = None
803                    continue
804                else:
805                    raise e
806
807            if r.has_key('RequestAccessResponseBody'):
808                # Through to here we have a valid response, not a fault.
809                # Access denied is a fault, so something better or worse than
810                # access denied has happened.
811                r = r['RequestAccessResponseBody']
812                self.log.debug("[get_access] Access granted")
813                break
814            else:
815                raise service_error(service_error.protocol,
816                        "Bad proxy response")
817       
818        if not r:
819            raise service_error(service_error.access, 
820                    "Access denied by %s (%s)" % (tb, uri))
821
822        e = r['emulab']
823        p = e['project']
824        tbparam[tb] = { 
825                "boss": e['boss'],
826                "host": e['ops'],
827                "domain": e['domain'],
828                "fs": e['fileServer'],
829                "eventserver": e['eventServer'],
830                "project": unpack_id(p['name']),
831                "emulab" : e,
832                "allocID" : r['allocID'],
833                }
834        # Make the testbed name be the label the user applied
835        p['testbed'] = {'localname': tb }
836
837        for u in p['user']:
838            role = u.get('role', None)
839            if role == 'experimentCreation':
840                tbparam[tb]['user'] = unpack_id(u['userID'])
841                break
842        else:
843            raise service_error(service_error.internal, 
844                    "No createExperimentUser from %s" %tb)
845
846        # Add attributes to barameter space.  We don't allow attributes to
847        # overlay any parameters already installed.
848        for a in e['fedAttr']:
849            try:
850                if a['attribute'] and isinstance(a['attribute'], basestring)\
851                        and not tbparam[tb].has_key(a['attribute'].lower()):
852                    tbparam[tb][a['attribute'].lower()] = a['value']
853            except KeyError:
854                self.log.error("Bad attribute in response: %s" % a)
855       
856    def release_access(self, tb, aid):
857        """
858        Release access to testbed through fedd
859        """
860
861        uri = self.tbmap.get(tb, None)
862        if not uri:
863            raise service_error(serice_error.server_config, 
864                    "Unknown testbed: %s" % tb)
865
866        if self.local_access.has_key(uri):
867            resp = self.local_access[uri].ReleaseAccess(\
868                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
869                    fedid(file=self.cert_file))
870            resp = { 'ReleaseAccessResponseBody': resp } 
871        else:
872            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
873                    self.cert_file, self.cert_pwd, self.trusted_certs)
874
875        # better error coding
876
877    def remote_splitter(self, uri, desc, master):
878
879        req = {
880                'description' : { 'ns2description': desc },
881                'master': master,
882                'include_fedkit': bool(self.fedkit),
883                'include_gatewaykit': bool(self.gatewaykit)
884            }
885
886        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
887                self.trusted_certs)
888
889        if r.has_key('Ns2SplitResponseBody'):
890            r = r['Ns2SplitResponseBody']
891            if r.has_key('output'):
892                return r['output'].splitlines()
893            else:
894                raise service_error(service_error.protocol, 
895                        "Bad splitter response (no output)")
896        else:
897            raise service_error(service_error.protocol, "Bad splitter response")
898
899    class start_segment:
900        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
901                cert_pwd=None, trusted_certs=None, caller=None,
902                log_collector=None):
903            self.log = log
904            self.debug = debug
905            self.cert_file = cert_file
906            self.cert_pwd = cert_pwd
907            self.trusted_certs = None
908            self.caller = caller
909            self.testbed = testbed
910            self.log_collector = log_collector
911
912        def __call__(self, uri, aid, topo, master, attrs=None):
913            req = {
914                    'allocID': { 'fedid' : aid }, 
915                    'segmentdescription': { 
916                        'topdldescription': topo.to_dict(),
917                    },
918                    'master': master,
919                }
920            if attrs:
921                req['fedAttr'] = attrs
922
923            try:
924                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
925                        self.trusted_certs)
926                if r.has_key('StartSegmentResponseBody'):
927                    lval = r['StartSegmentResponseBody'].get('allocationLog',
928                            None)
929                    if lval and self.log_collector:
930                        for line in  lval.splitlines(True):
931                            self.log_collector.write(line)
932                else:
933                    raise service_error(service_error.internal, 
934                            "Bad response!?: %s" %r)
935                return True
936            except service_error, e:
937                self.log.error("Start segment failed on %s: %s" % \
938                        (self.testbed, e))
939                return False
940
941
942
943    class terminate_segment:
944        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
945                cert_pwd=None, trusted_certs=None, caller=None):
946            self.log = log
947            self.debug = debug
948            self.cert_file = cert_file
949            self.cert_pwd = cert_pwd
950            self.trusted_certs = None
951            self.caller = caller
952            self.testbed = testbed
953
954        def __call__(self, uri, aid ):
955            req = {
956                    'allocID': aid , 
957                }
958            try:
959                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
960                        self.trusted_certs)
961                return True
962            except service_error, e:
963                self.log.error("Terminate segment failed on %s: %s" % \
964                        (self.testbed, e))
965                return False
966   
967
968    def allocate_resources(self, allocated, master, eid, expid, expcert, 
969            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
970            attrs=None):
971        started = { }           # Testbeds where a sub-experiment started
972                                # successfully
973
974        # XXX
975        fail_soft = False
976
977        log = alloc_log or self.log
978
979        thread_pool = self.thread_pool(self.nthreads)
980        threads = [ ]
981
982        for tb in [ k for k in allocated.keys() if k != master]:
983            # Create and start a thread to start the segment, and save it to
984            # get the return value later
985            thread_pool.wait_for_slot()
986            uri = self.tbmap.get(tb, None)
987            if not uri:
988                raise service_error(service_error.internal, 
989                        "Unknown testbed %s !?" % tb)
990
991            if tbparams[tb].has_key('allocID') and \
992                    tbparams[tb]['allocID'].has_key('fedid'):
993                aid = tbparams[tb]['allocID']['fedid']
994            else:
995                raise service_error(service_error.internal, 
996                        "No alloc id for testbed %s !?" % tb)
997
998            t  = self.pooled_thread(\
999                    target=self.start_segment(log=log, debug=self.debug,
1000                        testbed=tb, cert_file=self.cert_file,
1001                        cert_pwd=self.cert_pwd,
1002                        trusted_certs=self.trusted_certs,
1003                        caller=self.call_StartSegment,
1004                        log_collector=log_collector), 
1005                    args=(uri, aid, topo[tb], False, attrs), name=tb,
1006                    pdata=thread_pool, trace_file=self.trace_file)
1007            threads.append(t)
1008            t.start()
1009
1010        # Wait until all finish
1011        thread_pool.wait_for_all_done()
1012
1013        # If none failed, start the master
1014        failed = [ t.getName() for t in threads if not t.rv ]
1015
1016        if len(failed) == 0:
1017            uri = self.tbmap.get(master, None)
1018            if not uri:
1019                raise service_error(service_error.internal, 
1020                        "Unknown testbed %s !?" % master)
1021
1022            if tbparams[master].has_key('allocID') and \
1023                    tbparams[master]['allocID'].has_key('fedid'):
1024                aid = tbparams[master]['allocID']['fedid']
1025            else:
1026                raise service_error(service_error.internal, 
1027                    "No alloc id for testbed %s !?" % master)
1028            starter = self.start_segment(log=log, debug=self.debug,
1029                    testbed=master, cert_file=self.cert_file,
1030                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1031                    caller=self.call_StartSegment,
1032                    log_collector=log_collector)
1033            if not starter(uri, aid, topo[master], True, attrs):
1034                failed.append(master)
1035
1036        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1037        # If one failed clean up, unless fail_soft is set
1038        if failed and False:
1039            if not fail_soft:
1040                thread_pool.clear()
1041                for tb in succeeded:
1042                    # Create and start a thread to stop the segment
1043                    thread_pool.wait_for_slot()
1044                    t  = self.pooled_thread(\
1045                            target=self.stop_segment(log=log,
1046                                testbed=tb,
1047                                keyfile=self.ssh_privkey_file,
1048                                debug=self.debug), 
1049                            args=(tb, eid, tbparams), name=tb,
1050                            pdata=thread_pool, trace_file=self.trace_file)
1051                    t.start()
1052                # Wait until all finish
1053                thread_pool.wait_for_all_done()
1054
1055                # release the allocations
1056                for tb in tbparams.keys():
1057                    self.release_access(tb, tbparams[tb]['allocID'])
1058                # Remove the placeholder
1059                self.state_lock.acquire()
1060                self.state[eid]['experimentStatus'] = 'failed'
1061                if self.state_filename: self.write_state()
1062                self.state_lock.release()
1063
1064                log.error("Swap in failed on %s" % ",".join(failed))
1065                return
1066        else:
1067            log.info("[start_segment]: Experiment %s active" % eid)
1068
1069        log.debug("[start_experiment]: removing %s" % tmpdir)
1070
1071        # Walk up tmpdir, deleting as we go
1072        for path, dirs, files in os.walk(tmpdir, topdown=False):
1073            for f in files:
1074                os.remove(os.path.join(path, f))
1075            for d in dirs:
1076                os.rmdir(os.path.join(path, d))
1077        os.rmdir(tmpdir)
1078
1079        # Insert the experiment into our state and update the disk copy
1080        self.state_lock.acquire()
1081        self.state[expid]['experimentStatus'] = 'active'
1082        self.state[eid] = self.state[expid]
1083        if self.state_filename: self.write_state()
1084        self.state_lock.release()
1085        return
1086
1087
1088    def add_kit(self, e, kit):
1089        """
1090        Add a Software object created from the list of (install, location)
1091        tuples passed as kit  to the software attribute of an object e.  We
1092        do this enough to break out the code, but it's kind of a hack to
1093        avoid changing the old tuple rep.
1094        """
1095
1096        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1097
1098        if isinstance(e.software, list): e.software.extend(s)
1099        else: e.software = s
1100
1101
1102    def create_experiment_state(self, fid, req, expid, expcert):
1103        """
1104        Create the initial entry in the experiment's state.  The expid and
1105        expcert are the experiment's fedid and certifacte that represents that
1106        ID, which are installed in the experiment state.  If the request
1107        includes a suggested local name that is used if possible.  If the local
1108        name is already taken by an experiment owned by this user that has
1109        failed, it is overwriutten.  Otherwise new letters are added until a
1110        valid localname is found.  The generated local name is returned.
1111        """
1112
1113        if req.has_key('experimentID') and \
1114                req['experimentID'].has_key('localname'):
1115            overwrite = False
1116            eid = req['experimentID']['localname']
1117            # If there's an old failed experiment here with the same local name
1118            # and accessible by this user, we'll overwrite it, otherwise we'll
1119            # fall through and do the collision avoidance.
1120            old_expid = self.get_experiment_fedid(eid)
1121            if old_expid and self.check_experiment_access(fid, old_expid):
1122                self.state_lock.acquire()
1123                status = self.state[eid].get('experimentStatus', None)
1124                if status and status == 'failed':
1125                    # remove the old access attribute
1126                    self.auth.unset_attribute(fid, old_expid)
1127                    overwrite = True
1128                    del self.state[eid]
1129                    del self.state[old_expid]
1130                self.state_lock.release()
1131            self.state_lock.acquire()
1132            while (self.state.has_key(eid) and not overwrite):
1133                eid += random.choice(string.ascii_letters)
1134            # Initial state
1135            self.state[eid] = {
1136                    'experimentID' : \
1137                            [ { 'localname' : eid }, {'fedid': expid } ],
1138                    'experimentStatus': 'starting',
1139                    'experimentAccess': { 'X509' : expcert },
1140                    'owner': fid,
1141                    'log' : [],
1142                }
1143            self.state[expid] = self.state[eid]
1144            if self.state_filename: self.write_state()
1145            self.state_lock.release()
1146        else:
1147            eid = self.exp_stem
1148            for i in range(0,5):
1149                eid += random.choice(string.ascii_letters)
1150            self.state_lock.acquire()
1151            while (self.state.has_key(eid)):
1152                eid = self.exp_stem
1153                for i in range(0,5):
1154                    eid += random.choice(string.ascii_letters)
1155            # Initial state
1156            self.state[eid] = {
1157                    'experimentID' : \
1158                            [ { 'localname' : eid }, {'fedid': expid } ],
1159                    'experimentStatus': 'starting',
1160                    'experimentAccess': { 'X509' : expcert },
1161                    'owner': fid,
1162                    'log' : [],
1163                }
1164            self.state[expid] = self.state[eid]
1165            if self.state_filename: self.write_state()
1166            self.state_lock.release()
1167
1168        return eid
1169
1170
1171    def allocate_ips_to_topo(self, top):
1172        """
1173        Add an ip4_address attribute to all the hosts in teh topology, based on
1174        the shared substrates on which they sit.  An /etc/hosts file is also
1175        created and returned as a list of hostfiles entries.
1176        """
1177        subs = sorted(top.substrates, 
1178                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1179                reverse=True)
1180        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1181        ifs = { }
1182        hosts = [ ]
1183
1184        for idx, s in enumerate(subs):
1185            a = ips.allocate(len(s.interfaces)+2)
1186            if a :
1187                base, num = a
1188                if num < len(s.interfaces) +2 : 
1189                    raise service_error(service_error.internal,
1190                            "Allocator returned wrong number of IPs??")
1191            else:
1192                raise service_error(service_error.req, 
1193                        "Cannot allocate IP addresses")
1194
1195            base += 1
1196            for i in s.interfaces:
1197                i.attribute.append(
1198                        topdl.Attribute('ip4_address', 
1199                            "%s" % ip_addr(base)))
1200                hname = i.element.name[0]
1201                if ifs.has_key(hname):
1202                    hosts.append("%s\t%s-%s %s-%d" % \
1203                            (ip_addr(base), hname, s.name, hname,
1204                                ifs[hname]))
1205                else:
1206                    ifs[hname] = 0
1207                    hosts.append("%s\t%s-%s %s-%d %s" % \
1208                            (ip_addr(base), hname, s.name, hname,
1209                                ifs[hname], hname))
1210
1211                ifs[hname] += 1
1212                base += 1
1213        return hosts
1214
1215    def get_access_to_testbeds(self, testbeds, user, access_user, 
1216            export_project, master, allocated, tbparams):
1217        """
1218        Request access to the various testbeds required for this instantiation
1219        (passed in as testbeds).  User, access_user, expoert_project and master
1220        are used to construct the correct requests.  Per-testbed parameters are
1221        returned in tbparams.
1222        """
1223        for tb in testbeds:
1224            self.get_access(tb, None, user, tbparams, master,
1225                    export_project, access_user)
1226            allocated[tb] = 1
1227
1228    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1229        """
1230        Create the sub-topologies that are needed for experimetn instantiation.
1231        Along the way attach startup commands to the computers in the
1232        subtopologies.
1233        """
1234        for tb in testbeds:
1235            topo[tb] = top.clone()
1236            to_delete = [ ]
1237            for e in topo[tb].elements:
1238                etb = e.get_attribute('testbed')
1239                if etb and etb != tb:
1240                    for i in e.interface:
1241                        for s in i.subs:
1242                            try:
1243                                s.interfaces.remove(i)
1244                            except ValueError:
1245                                raise service_error(service_error.internal,
1246                                        "Can't remove interface??")
1247                    to_delete.append(e)
1248            for e in to_delete:
1249                topo[tb].elements.remove(e)
1250            topo[tb].make_indices()
1251
1252            for e in [ e for e in topo[tb].elements \
1253                    if isinstance(e,topdl.Computer)]:
1254                if tb == master:
1255                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1256                else:
1257                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1258                scmd = e.get_attribute('startup')
1259                if scmd:
1260                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1261
1262                e.set_attribute('startup', cmd)
1263                if self.fedkit: self.add_kit(e, self.fedkit)
1264
1265    def add_portals(self, top, topo, eid, master, tbparams):
1266        """
1267        For each substrate in the main topology, find those that
1268        have nodes on more than one testbed.  Insert portal nodes
1269        into the copies of those substrates on the sub topologies.
1270        """
1271        for s in top.substrates:
1272            # tbs will contain an ip address on this subsrate that is in
1273            # each testbed.
1274            tbs = { }
1275            for i in s.interfaces:
1276                e = i.element
1277                tb = e.get_attribute('testbed')
1278                if tb and not tbs.has_key(tb):
1279                    for i in e.interface:
1280                        if s in i.subs:
1281                            tbs[tb]= i.get_attribute('ip4_address')
1282            if len(tbs) < 2:
1283                continue
1284
1285            # More than one testbed is on this substrate.  Insert
1286            # some portals into the subtopologies.  st == source testbed,
1287            # dt == destination testbed.
1288            segment_substrate = { }
1289            for st in tbs.keys():
1290                segment_substrate[st] = { }
1291                for dt in [ t for t in tbs.keys() if t != st]:
1292                    myname =  "%stunnel" % dt
1293                    desthost  =  "%stunnel" % st
1294                    sproject = tbparams[st].get('project', 'project')
1295                    dproject = tbparams[dt].get('project', 'project')
1296                    mproject = tbparams[master].get('project', 'project')
1297                    sdomain = tbparams[st].get('domain', ".example.com")
1298                    ddomain = tbparams[dt].get('domain', ".example.com")
1299                    mdomain = tbparams[master].get('domain', '.example.com')
1300                    muser = tbparams[master].get('user', 'root')
1301                    smbshare = tbparams[master].get('smbshare', 'USERS')
1302                    # XXX: active and type need to be unkludged
1303                    active = ("%s" % (st == master))
1304                    if not segment_substrate[st].has_key(dt):
1305                        # Put a substrate and a segment for the connected
1306                        # testbed in there.
1307                        tsubstrate = \
1308                                topdl.Substrate(name='%s-%s' % (st, dt),
1309                                        attribute= [
1310                                            topdl.Attribute(
1311                                                attribute='portal',
1312                                                value='true')
1313                                            ]
1314                                        )
1315                        segment_element = topdl.Segment(
1316                                id= tbparams[dt]['allocID'],
1317                                type='emulab',
1318                                uri = self.tbmap.get(dt, None),
1319                                interface=[ 
1320                                    topdl.Interface(
1321                                        substrate=tsubstrate.name),
1322                                    ],
1323                                attribute = [
1324                                    topdl.Attribute(attribute=n, value=v)
1325                                        for n, v in (\
1326                                            ('domain', ddomain),
1327                                            ('experiment', "%s/%s" % \
1328                                                    (dproject, eid)),)
1329                                    ],
1330                                )
1331                        segment_substrate[st][dt] = tsubstrate
1332                        topo[st].substrates.append(tsubstrate)
1333                        topo[st].elements.append(segment_element)
1334                    portal = topdl.Computer(
1335                            name="%stunnel" % dt,
1336                            attribute=[ 
1337                                topdl.Attribute(attribute=n,value=v)
1338                                    for n, v in (\
1339                                        ('portal', 'true'),
1340                                        ('domain', sdomain),
1341                                        ('masterdomain', mdomain),
1342                                        ('masterexperiment', "%s/%s" % \
1343                                                (mproject, eid)),
1344                                        ('masteruser', muser),
1345                                        ('smbshare', smbshare),
1346                                        ('experiment', "%s/%s" % \
1347                                                (sproject, eid)),
1348                                        ('peer', "%s" % desthost),
1349                                        ('peer_segment', "%s" % \
1350                                                tbparams[dt]['allocID']['fedid']),
1351                                        ('scriptdir', 
1352                                            "/usr/local/federation/bin"),
1353                                        ('active', "%s" % active),
1354                                        ('portal_type', 'both'), 
1355                                        ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), eid.lower(), sproject.lower(), sdomain.lower())))
1356                                ],
1357                            interface=[
1358                                topdl.Interface(
1359                                    substrate=s.name,
1360                                    attribute=[ 
1361                                        topdl.Attribute(
1362                                            attribute='ip4_address', 
1363                                            value=tbs[dt]
1364                                        )
1365                                    ]),
1366                                topdl.Interface(
1367                                    substrate=\
1368                                        segment_substrate[st][dt].name,
1369                                    attribute=[
1370                                        topdl.Attribute(attribute='portal',
1371                                            value='true')
1372                                        ]
1373                                    ),
1374                                ],
1375                            )
1376                    if self.fedkit: self.add_kit(portal, self.fedkit)
1377                    if self.gatewaykit: self.add_kit(portal, self.gatewaykit)
1378
1379                    topo[st].elements.append(portal)
1380
1381        # Connect the gateway nodes into the topologies and clear out
1382        # substrates that are not in the topologies
1383        for tb in tbparams.keys():
1384            topo[tb].incorporate_elements()
1385            topo[tb].substrates = \
1386                    [s for s in topo[tb].substrates \
1387                        if len(s.interfaces) >0]
1388
1389    def wrangle_software(self, expid, top, topo, tbparams):
1390        """
1391        Copy software out to the repository directory, allocate permissions and
1392        rewrite the segment topologies to look for the software in local
1393        places.
1394        """
1395
1396        # Copy the rpms and tarfiles to a distribution directory from
1397        # which the federants can retrieve them
1398        linkpath = "%s/software" %  expid
1399        softdir ="%s/%s" % ( self.repodir, linkpath)
1400        softmap = { }
1401        # These are in a list of tuples format (each kit).  This comprehension
1402        # unwraps them into a single list of tuples that initilaizes the set of
1403        # tuples.
1404        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1405                for p, t in l ])
1406        pkgs.update([x.location for e in top.elements \
1407                for x in e.software])
1408        try:
1409            os.makedirs(softdir)
1410        except IOError, e:
1411            raise service_error(
1412                    "Cannot create software directory: %s" % e)
1413        # The actual copying.  Everything's converted into a url for copying.
1414        for pkg in pkgs:
1415            loc = pkg
1416
1417            scheme, host, path = urlparse(loc)[0:3]
1418            dest = os.path.basename(path)
1419            if not scheme:
1420                if not loc.startswith('/'):
1421                    loc = "/%s" % loc
1422                loc = "file://%s" %loc
1423            try:
1424                u = urlopen(loc)
1425            except Exception, e:
1426                raise service_error(service_error.req, 
1427                        "Cannot open %s: %s" % (loc, e))
1428            try:
1429                f = open("%s/%s" % (softdir, dest) , "w")
1430                self.log.debug("Writing %s/%s" % (softdir,dest) )
1431                data = u.read(4096)
1432                while data:
1433                    f.write(data)
1434                    data = u.read(4096)
1435                f.close()
1436                u.close()
1437            except Exception, e:
1438                raise service_error(service_error.internal,
1439                        "Could not copy %s: %s" % (loc, e))
1440            path = re.sub("/tmp", "", linkpath)
1441            # XXX
1442            softmap[pkg] = \
1443                    "https://users.isi.deterlab.net:23232/%s/%s" %\
1444                    ( path, dest)
1445
1446            # Allow the individual segments to access the software.
1447            for tb in tbparams.keys():
1448                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1449                        "/%s/%s" % ( path, dest))
1450
1451        # Convert the software locations in the segments into the local
1452        # copies on this host
1453        for soft in [ s for tb in topo.values() \
1454                for e in tb.elements \
1455                    if getattr(e, 'software', False) \
1456                        for s in e.software ]:
1457            if softmap.has_key(soft.location):
1458                soft.location = softmap[soft.location]
1459
1460
1461    def create_experiment(self, req, fid):
1462        """
1463        The external interface to experiment creation called from the
1464        dispatcher.
1465
1466        Creates a working directory, splits the incoming description using the
1467        splitter script and parses out the avrious subsections using the
1468        lcasses above.  Once each sub-experiment is created, use pooled threads
1469        to instantiate them and start it all up.
1470        """
1471        if not self.auth.check_attribute(fid, 'create'):
1472            raise service_error(service_error.access, "Create access denied")
1473
1474        try:
1475            tmpdir = tempfile.mkdtemp(prefix="split-")
1476            os.mkdir(tmpdir+"/keys")
1477        except IOError:
1478            raise service_error(service_error.internal, "Cannot create tmp dir")
1479
1480        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1481        gw_secretkey_base = "fed.%s" % self.ssh_type
1482        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1483        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1484        tclfile = tmpdir + "/experiment.tcl"
1485        tbparams = { }
1486        try:
1487            access_user = self.accessdb[fid]
1488        except KeyError:
1489            raise service_error(service_error.internal,
1490                    "Access map and authorizer out of sync in " + \
1491                            "create_experiment for fedid %s"  % fid)
1492
1493        pid = "dummy"
1494        gid = "dummy"
1495
1496        req = req.get('CreateRequestBody', None)
1497        if not req:
1498            raise service_error(service_error.req,
1499                    "Bad request format (no CreateRequestBody)")
1500        # The tcl parser needs to read a file so put the content into that file
1501        descr=req.get('experimentdescription', None)
1502        if descr:
1503            file_content=descr.get('ns2description', None)
1504            if file_content:
1505                try:
1506                    f = open(tclfile, 'w')
1507                    f.write(file_content)
1508                    f.close()
1509                except IOError:
1510                    raise service_error(service_error.internal,
1511                            "Cannot write temp experiment description")
1512            else:
1513                raise service_error(service_error.req, 
1514                        "Only ns2descriptions supported")
1515        else:
1516            raise service_error(service_error.req, "No experiment description")
1517
1518        # Generate an ID for the experiment (slice) and a certificate that the
1519        # allocator can use to prove they own it.  We'll ship it back through
1520        # the encrypted connection.
1521        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1522
1523        eid = self.create_experiment_state(fid, req, expid, expcert)
1524        try: 
1525            # This catches exceptions to clear the placeholder if necessary
1526            try:
1527                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1528            except ValueError:
1529                raise service_error(service_error.server_config, 
1530                        "Bad key type (%s)" % self.ssh_type)
1531
1532            user = req.get('user', None)
1533            if user == None:
1534                raise service_error(service_error.req, "No user")
1535
1536            master = req.get('master', None)
1537            if not master:
1538                raise service_error(service_error.req,
1539                        "No master testbed label")
1540            export_project = req.get('exportProject', None)
1541            if not export_project:
1542                raise service_error(service_error.req, "No export project")
1543           
1544            # Translate to topdl
1545            if self.splitter_url:
1546                # XXX: need remote topdl translator
1547                self.log.debug("Calling remote splitter at %s" % \
1548                        self.splitter_url)
1549                split_data = self.remote_splitter(self.splitter_url,
1550                        file_content, master)
1551            else:
1552                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1553                    str(self.muxmax), '-m', master]
1554
1555                if self.fedkit:
1556                    tclcmd.append('-k')
1557
1558                if self.gatewaykit:
1559                    tclcmd.append('-K')
1560
1561                tclcmd.extend([pid, gid, eid, tclfile])
1562
1563                self.log.debug("running local splitter %s", " ".join(tclcmd))
1564                # This is just fantastic.  As a side effect the parser copies
1565                # tb_compat.tcl into the current directory, so that directory
1566                # must be writable by the fedd user.  Doing this in the
1567                # temporary subdir ensures this is the case.
1568                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1569                        cwd=tmpdir)
1570                split_data = tclparser.stdout
1571
1572            top = topdl.topology_from_xml(file=split_data, top="experiment")
1573
1574            hosts = self.allocate_ips_to_topo(top)
1575             # Find the testbeds to look up
1576            testbeds = set([ a.value for e in top.elements \
1577                    for a in e.attribute \
1578                    if a.attribute == 'testbed'] )
1579
1580            allocated = { }         # Testbeds we can access
1581            topo ={ }               # Sub topologies
1582            self.get_access_to_testbeds(testbeds, user, access_user, 
1583                    export_project, master, allocated, tbparams)
1584            self.split_topology(top, topo, testbeds, eid, master, tbparams)
1585
1586            # Copy configuration files into the remote file store
1587            # The config urlpath
1588            configpath = "/%s/config" % expid
1589            # The config file system location
1590            configdir ="%s%s" % ( self.repodir, configpath)
1591            try:
1592                os.makedirs(configdir)
1593            except IOError, e:
1594                raise service_error(
1595                        "Cannot create config directory: %s" % e)
1596            try:
1597                f = open("%s/hosts" % configdir, "w")
1598                f.write('\n'.join(hosts))
1599                f.close()
1600            except IOError, e:
1601                raise service_error(service_error.internal, 
1602                        "Cannot write hosts file: %s" % e)
1603            try:
1604                self.copy_file("%s" % gw_pubkey, "%s/%s" % \
1605                        (configdir, gw_pubkey_base))
1606                self.copy_file("%s" % gw_secretkey, "%s/%s" % \
1607                        (configdir, gw_secretkey_base))
1608            except IOError, e:
1609                raise service_error(service_error.internal, 
1610                        "Cannot copy keyfiles: %s" % e)
1611
1612            # Allow the individual testbeds to access the configuration files.
1613            for tb in tbparams.keys():
1614                asignee = tbparams[tb]['allocID']['fedid']
1615                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1616                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1617
1618            self.add_portals(top, topo, eid, master, tbparams)
1619            self.wrangle_software(expid, top, topo, tbparams)
1620
1621            vtopo = topdl.topology_to_vtopo(top)
1622            vis = self.genviz(vtopo)
1623
1624            # save federant information
1625            for k in allocated.keys():
1626                tbparams[k]['federant'] = {\
1627                        'name': [ { 'localname' : eid} ],\
1628                        'emulab': tbparams[k]['emulab'],\
1629                        'allocID' : tbparams[k]['allocID'],\
1630                        'master' : k == master,\
1631                    }
1632
1633            self.state_lock.acquire()
1634            self.state[eid]['vtopo'] = vtopo
1635            self.state[eid]['vis'] = vis
1636            self.state[expid]['federant'] = \
1637                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1638                        if tbparams[tb].has_key('federant') ]
1639            if self.state_filename: 
1640                self.write_state()
1641            self.state_lock.release()
1642        except service_error, e:
1643            # If something goes wrong in the parse (usually an access error)
1644            # clear the placeholder state.  From here on out the code delays
1645            # exceptions.  Failing at this point returns a fault to the remote
1646            # caller.
1647
1648            self.state_lock.acquire()
1649            del self.state[eid]
1650            del self.state[expid]
1651            if self.state_filename: self.write_state()
1652            self.state_lock.release()
1653            raise e
1654
1655
1656        # Start the background swapper and return the starting state.  From
1657        # here on out, the state will stick around a while.
1658
1659        # Let users touch the state
1660        self.auth.set_attribute(fid, expid)
1661        self.auth.set_attribute(expid, expid)
1662        # Override fedids can manipulate state as well
1663        for o in self.overrides:
1664            self.auth.set_attribute(o, expid)
1665
1666        # Create a logger that logs to the experiment's state object as well as
1667        # to the main log file.
1668        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1669        alloc_collector = self.list_log(self.state[eid]['log'])
1670        h = logging.StreamHandler(alloc_collector)
1671        # XXX: there should be a global one of these rather than repeating the
1672        # code.
1673        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1674                    '%d %b %y %H:%M:%S'))
1675        alloc_log.addHandler(h)
1676       
1677        # XXX
1678        url_base = 'https://users.isi.deterlab.net:23232'
1679        attrs = [ 
1680                {
1681                    'attribute': 'ssh_pubkey', 
1682                    'value': '%s/%s/config/%s' % \
1683                            (url_base, expid, gw_pubkey_base)
1684                },
1685                {
1686                    'attribute': 'ssh_secretkey', 
1687                    'value': '%s/%s/config/%s' % \
1688                            (url_base, expid, gw_secretkey_base)
1689                },
1690                {
1691                    'attribute': 'hosts', 
1692                    'value': '%s/%s/config/hosts' % \
1693                            (url_base, expid)
1694                },
1695                {
1696                    'attribute': 'experiment_name',
1697                    'value': eid,
1698                },
1699            ]
1700
1701        # Start a thread to do the resource allocation
1702        t  = Thread(target=self.allocate_resources,
1703                args=(allocated, master, eid, expid, expcert, tbparams, 
1704                    topo, tmpdir, alloc_log, alloc_collector, attrs),
1705                name=eid)
1706        t.start()
1707
1708        rv = {
1709                'experimentID': [
1710                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1711                ],
1712                'experimentStatus': 'starting',
1713                'experimentAccess': { 'X509' : expcert }
1714            }
1715
1716        return rv
1717   
1718    def get_experiment_fedid(self, key):
1719        """
1720        find the fedid associated with the localname key in the state database.
1721        """
1722
1723        rv = None
1724        self.state_lock.acquire()
1725        if self.state.has_key(key):
1726            if isinstance(self.state[key], dict):
1727                try:
1728                    kl = [ f['fedid'] for f in \
1729                            self.state[key]['experimentID']\
1730                                if f.has_key('fedid') ]
1731                except KeyError:
1732                    self.state_lock.release()
1733                    raise service_error(service_error.internal, 
1734                            "No fedid for experiment %s when getting "+\
1735                                    "fedid(!?)" % key)
1736                if len(kl) == 1:
1737                    rv = kl[0]
1738                else:
1739                    self.state_lock.release()
1740                    raise service_error(service_error.internal, 
1741                            "multiple fedids for experiment %s when " +\
1742                                    "getting fedid(!?)" % key)
1743            else:
1744                self.state_lock.release()
1745                raise service_error(service_error.internal, 
1746                        "Unexpected state for %s" % key)
1747        self.state_lock.release()
1748        return rv
1749
1750    def check_experiment_access(self, fid, key):
1751        """
1752        Confirm that the fid has access to the experiment.  Though a request
1753        may be made in terms of a local name, the access attribute is always
1754        the experiment's fedid.
1755        """
1756        if not isinstance(key, fedid):
1757            key = self.get_experiment_fedid(key)
1758
1759        if self.auth.check_attribute(fid, key):
1760            return True
1761        else:
1762            raise service_error(service_error.access, "Access Denied")
1763
1764
1765    def get_handler(self, path, fid):
1766        if self.auth.check_attribute(fid, path):
1767            return ("%s/%s" % (self.repodir, path), "application/binary")
1768        else:
1769            return (None, None)
1770
1771    def get_vtopo(self, req, fid):
1772        """
1773        Return the stored virtual topology for this experiment
1774        """
1775        rv = None
1776        state = None
1777
1778        req = req.get('VtopoRequestBody', None)
1779        if not req:
1780            raise service_error(service_error.req,
1781                    "Bad request format (no VtopoRequestBody)")
1782        exp = req.get('experiment', None)
1783        if exp:
1784            if exp.has_key('fedid'):
1785                key = exp['fedid']
1786                keytype = "fedid"
1787            elif exp.has_key('localname'):
1788                key = exp['localname']
1789                keytype = "localname"
1790            else:
1791                raise service_error(service_error.req, "Unknown lookup type")
1792        else:
1793            raise service_error(service_error.req, "No request?")
1794
1795        self.check_experiment_access(fid, key)
1796
1797        self.state_lock.acquire()
1798        if self.state.has_key(key):
1799            if self.state[key].has_key('vtopo'):
1800                rv = { 'experiment' : {keytype: key },\
1801                        'vtopo': self.state[key]['vtopo'],\
1802                    }
1803            else:
1804                state = self.state[key]['experimentStatus']
1805        self.state_lock.release()
1806
1807        if rv: return rv
1808        else: 
1809            if state:
1810                raise service_error(service_error.partial, 
1811                        "Not ready: %s" % state)
1812            else:
1813                raise service_error(service_error.req, "No such experiment")
1814
1815    def get_vis(self, req, fid):
1816        """
1817        Return the stored visualization for this experiment
1818        """
1819        rv = None
1820        state = None
1821
1822        req = req.get('VisRequestBody', None)
1823        if not req:
1824            raise service_error(service_error.req,
1825                    "Bad request format (no VisRequestBody)")
1826        exp = req.get('experiment', None)
1827        if exp:
1828            if exp.has_key('fedid'):
1829                key = exp['fedid']
1830                keytype = "fedid"
1831            elif exp.has_key('localname'):
1832                key = exp['localname']
1833                keytype = "localname"
1834            else:
1835                raise service_error(service_error.req, "Unknown lookup type")
1836        else:
1837            raise service_error(service_error.req, "No request?")
1838
1839        self.check_experiment_access(fid, key)
1840
1841        self.state_lock.acquire()
1842        if self.state.has_key(key):
1843            if self.state[key].has_key('vis'):
1844                rv =  { 'experiment' : {keytype: key },\
1845                        'vis': self.state[key]['vis'],\
1846                        }
1847            else:
1848                state = self.state[key]['experimentStatus']
1849        self.state_lock.release()
1850
1851        if rv: return rv
1852        else:
1853            if state:
1854                raise service_error(service_error.partial, 
1855                        "Not ready: %s" % state)
1856            else:
1857                raise service_error(service_error.req, "No such experiment")
1858
1859    def clean_info_response(self, rv):
1860        """
1861        Remove the information in the experiment's state object that is not in
1862        the info response.
1863        """
1864        # Remove the owner info (should always be there, but...)
1865        if rv.has_key('owner'): del rv['owner']
1866
1867        # Convert the log into the allocationLog parameter and remove the
1868        # log entry (with defensive programming)
1869        if rv.has_key('log'):
1870            rv['allocationLog'] = "".join(rv['log'])
1871            del rv['log']
1872        else:
1873            rv['allocationLog'] = ""
1874
1875        if rv['experimentStatus'] != 'active':
1876            if rv.has_key('federant'): del rv['federant']
1877        else:
1878            # remove the allocationID info from each federant
1879            for f in rv.get('federant', []):
1880                if f.has_key('allocID'): del f['allocID']
1881        return rv
1882
1883    def get_info(self, req, fid):
1884        """
1885        Return all the stored info about this experiment
1886        """
1887        rv = None
1888
1889        req = req.get('InfoRequestBody', None)
1890        if not req:
1891            raise service_error(service_error.req,
1892                    "Bad request format (no InfoRequestBody)")
1893        exp = req.get('experiment', None)
1894        if exp:
1895            if exp.has_key('fedid'):
1896                key = exp['fedid']
1897                keytype = "fedid"
1898            elif exp.has_key('localname'):
1899                key = exp['localname']
1900                keytype = "localname"
1901            else:
1902                raise service_error(service_error.req, "Unknown lookup type")
1903        else:
1904            raise service_error(service_error.req, "No request?")
1905
1906        self.check_experiment_access(fid, key)
1907
1908        # The state may be massaged by the service function that called
1909        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1910        # state.
1911        self.state_lock.acquire()
1912        if self.state.has_key(key):
1913            rv = copy.deepcopy(self.state[key])
1914        self.state_lock.release()
1915
1916        if rv:
1917            return self.clean_info_response(rv)
1918        else:
1919            raise service_error(service_error.req, "No such experiment")
1920
1921    def get_multi_info(self, req, fid):
1922        """
1923        Return all the stored info that this fedid can access
1924        """
1925        rv = { 'info': [ ] }
1926
1927        self.state_lock.acquire()
1928        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
1929            self.check_experiment_access(fid, key)
1930
1931            if self.state.has_key(key):
1932                e = copy.deepcopy(self.state[key])
1933                e = self.clean_info_response(e)
1934                rv['info'].append(e)
1935        self.state_lock.release()
1936        return rv
1937
1938    def terminate_experiment(self, req, fid):
1939        """
1940        Swap this experiment out on the federants and delete the shared
1941        information
1942        """
1943        tbparams = { }
1944        req = req.get('TerminateRequestBody', None)
1945        if not req:
1946            raise service_error(service_error.req,
1947                    "Bad request format (no TerminateRequestBody)")
1948        force = req.get('force', False)
1949        exp = req.get('experiment', None)
1950        if exp:
1951            if exp.has_key('fedid'):
1952                key = exp['fedid']
1953                keytype = "fedid"
1954            elif exp.has_key('localname'):
1955                key = exp['localname']
1956                keytype = "localname"
1957            else:
1958                raise service_error(service_error.req, "Unknown lookup type")
1959        else:
1960            raise service_error(service_error.req, "No request?")
1961
1962        self.check_experiment_access(fid, key)
1963
1964        dealloc_list = [ ]
1965
1966
1967        # Create a logger that logs to the dealloc_list as well as to the main
1968        # log file.
1969        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
1970        h = logging.StreamHandler(self.list_log(dealloc_list))
1971        # XXX: there should be a global one of these rather than repeating the
1972        # code.
1973        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1974                    '%d %b %y %H:%M:%S'))
1975        dealloc_log.addHandler(h)
1976
1977        self.state_lock.acquire()
1978        fed_exp = self.state.get(key, None)
1979
1980        if fed_exp:
1981            # This branch of the conditional holds the lock to generate a
1982            # consistent temporary tbparams variable to deallocate experiments.
1983            # It releases the lock to do the deallocations and reacquires it to
1984            # remove the experiment state when the termination is complete.
1985
1986            # First make sure that the experiment creation is complete.
1987            status = fed_exp.get('experimentStatus', None)
1988
1989            if status:
1990                if status in ('starting', 'terminating'):
1991                    if not force:
1992                        self.state_lock.release()
1993                        raise service_error(service_error.partial, 
1994                                'Experiment still being created or destroyed')
1995                    else:
1996                        self.log.warning('Experiment in %s state ' % status + \
1997                                'being terminated by force.')
1998            else:
1999                # No status??? trouble
2000                self.state_lock.release()
2001                raise service_error(service_error.internal,
2002                        "Experiment has no status!?")
2003
2004            ids = []
2005            #  experimentID is a list of dicts that are self-describing
2006            #  identifiers.  This finds all the fedids and localnames - the
2007            #  keys of self.state - and puts them into ids.
2008            for id in fed_exp.get('experimentID', []):
2009                if id.has_key('fedid'): ids.append(id['fedid'])
2010                if id.has_key('localname'): ids.append(id['localname'])
2011
2012            # Collect the allocation/segment ids
2013            for fed in fed_exp.get('federant', []):
2014                try:
2015                    tb = fed['emulab']['project']['testbed']['localname']
2016                    aid = fed['allocID']
2017                except KeyError, e:
2018                    continue
2019                tbparams[tb] = aid
2020            fed_exp['experimentStatus'] = 'terminating'
2021            if self.state_filename: self.write_state()
2022            self.state_lock.release()
2023
2024            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2025            # then completes, so we can't wait if nothing starts.  So, no
2026            # tbparams, no start.
2027            if len(tbparams) > 0:
2028                thread_pool = self.thread_pool(self.nthreads)
2029                for tb in tbparams.keys():
2030                    # Create and start a thread to stop the segment
2031                    thread_pool.wait_for_slot()
2032                    uri = self.tbmap.get(tb, None)
2033                    t  = self.pooled_thread(\
2034                            target=self.terminate_segment(log=dealloc_log,
2035                                testbed=tb,
2036                                cert_file=self.cert_file, 
2037                                cert_pwd=self.cert_pwd,
2038                                trusted_certs=self.trusted_certs,
2039                                caller=self.call_TerminateSegment),
2040                            args=(uri, tbparams[tb]), name=tb,
2041                            pdata=thread_pool, trace_file=self.trace_file)
2042                    t.start()
2043                # Wait for completions
2044                thread_pool.wait_for_all_done()
2045
2046            # release the allocations (failed experiments have done this
2047            # already, and starting experiments may be in odd states, so we
2048            # ignore errors releasing those allocations
2049            try: 
2050                for tb in tbparams.keys():
2051                    self.release_access(tb, tbparams[tb])
2052            except service_error, e:
2053                if status != 'failed' and not force:
2054                    raise e
2055
2056            # Remove the terminated experiment
2057            self.state_lock.acquire()
2058            for id in ids:
2059                if self.state.has_key(id): del self.state[id]
2060
2061            if self.state_filename: self.write_state()
2062            self.state_lock.release()
2063
2064            return { 
2065                    'experiment': exp , 
2066                    'deallocationLog': "".join(dealloc_list),
2067                    }
2068        else:
2069            # Don't forget to release the lock
2070            self.state_lock.release()
2071            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.