source: fedd/federation/experiment_control.py @ 46bc576

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 46bc576 was ecf679e, checked in by Ted Faber <faber@…>, 15 years ago

fix terminate/multistatus

  • Property mode set to 100644
File size: 85.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221
222        self.exp_stem = "fed-stem"
223        self.log = logging.getLogger("fedd.experiment_control")
224        set_log_level(config, "experiment_control", self.log)
225        self.muxmax = 2
226        self.nthreads = 2
227        self.randomize_experiments = False
228
229        self.splitter = None
230        self.ssh_keygen = "/usr/bin/ssh-keygen"
231        self.ssh_identity_file = None
232
233
234        self.debug = config.getboolean("experiment_control", "create_debug")
235        self.cleanup = not config.getboolean("experiment_control", 
236                "leave_tmpfiles")
237        self.state_filename = config.get("experiment_control", 
238                "experiment_state")
239        self.splitter_url = config.get("experiment_control", "splitter_uri")
240        self.fedkit = parse_tarfile_list(\
241                config.get("experiment_control", "fedkit"))
242        self.gatewaykit = parse_tarfile_list(\
243                config.get("experiment_control", "gatewaykit"))
244        accessdb_file = config.get("experiment_control", "accessdb")
245
246        self.ssh_pubkey_file = config.get("experiment_control", 
247                "ssh_pubkey_file")
248        self.ssh_privkey_file = config.get("experiment_control",
249                "ssh_privkey_file")
250        # NB for internal master/slave ops, not experiment setup
251        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
252
253        self.overrides = set([])
254        ovr = config.get('experiment_control', 'overrides')
255        if ovr:
256            for o in ovr.split(","):
257                o = o.strip()
258                if o.startswith('fedid:'): o = o[len('fedid:'):]
259                self.overrides.add(fedid(hexstr=o))
260
261        self.state = { }
262        self.state_lock = Lock()
263        self.tclsh = "/usr/local/bin/otclsh"
264        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
265                config.get("experiment_control", "tcl_splitter",
266                        "/usr/testbed/lib/ns2ir/parse.tcl")
267        mapdb_file = config.get("experiment_control", "mapdb")
268        self.trace_file = sys.stderr
269
270        self.def_expstart = \
271                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
272                "/tmp/federate";
273        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
274                "FEDDIR/hosts";
275        self.def_gwstart = \
276                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
277                "/tmp/bridge.log";
278        self.def_mgwstart = \
279                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
280                "/tmp/bridge.log";
281        self.def_gwimage = "FBSD61-TUNNEL2";
282        self.def_gwtype = "pc";
283        self.local_access = { }
284
285        if auth:
286            self.auth = auth
287        else:
288            self.log.error(\
289                    "[access]: No authorizer initialized, creating local one.")
290            auth = authorizer()
291
292
293        if self.ssh_pubkey_file:
294            try:
295                f = open(self.ssh_pubkey_file, 'r')
296                self.ssh_pubkey = f.read()
297                f.close()
298            except IOError:
299                raise service_error(service_error.internal,
300                        "Cannot read sshpubkey")
301        else:
302            raise service_error(service_error.internal, 
303                    "No SSH public key file?")
304
305        if not self.ssh_privkey_file:
306            raise service_error(service_error.internal, 
307                    "No SSH public key file?")
308
309
310        if mapdb_file:
311            self.read_mapdb(mapdb_file)
312        else:
313            self.log.warn("[experiment_control] No testbed map, using defaults")
314            self.tbmap = { 
315                    'deter':'https://users.isi.deterlab.net:23235',
316                    'emulab':'https://users.isi.deterlab.net:23236',
317                    'ucb':'https://users.isi.deterlab.net:23237',
318                    }
319
320        if accessdb_file:
321                self.read_accessdb(accessdb_file)
322        else:
323            raise service_error(service_error.internal,
324                    "No accessdb specified in config")
325
326        # Grab saved state.  OK to do this w/o locking because it's read only
327        # and only one thread should be in existence that can see self.state at
328        # this point.
329        if self.state_filename:
330            self.read_state()
331
332        # Dispatch tables
333        self.soap_services = {\
334                'Create': soap_handler('Create', self.create_experiment),
335                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
336                'Vis': soap_handler('Vis', self.get_vis),
337                'Info': soap_handler('Info', self.get_info),
338                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
339                'Terminate': soap_handler('Terminate', 
340                    self.terminate_experiment),
341        }
342
343        self.xmlrpc_services = {\
344                'Create': xmlrpc_handler('Create', self.create_experiment),
345                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
346                'Vis': xmlrpc_handler('Vis', self.get_vis),
347                'Info': xmlrpc_handler('Info', self.get_info),
348                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
349                'Terminate': xmlrpc_handler('Terminate',
350                    self.terminate_experiment),
351        }
352
353    # Call while holding self.state_lock
354    def write_state(self):
355        """
356        Write a new copy of experiment state after copying the existing state
357        to a backup.
358
359        State format is a simple pickling of the state dictionary.
360        """
361        if os.access(self.state_filename, os.W_OK):
362            copy_file(self.state_filename, \
363                    "%s.bak" % self.state_filename)
364        try:
365            f = open(self.state_filename, 'w')
366            pickle.dump(self.state, f)
367        except IOError, e:
368            self.log.error("Can't write file %s: %s" % \
369                    (self.state_filename, e))
370        except pickle.PicklingError, e:
371            self.log.error("Pickling problem: %s" % e)
372        except TypeError, e:
373            self.log.error("Pickling problem (TypeError): %s" % e)
374
375    # Call while holding self.state_lock
376    def read_state(self):
377        """
378        Read a new copy of experiment state.  Old state is overwritten.
379
380        State format is a simple pickling of the state dictionary.
381        """
382       
383        def get_experiment_id(state):
384            """
385            Pull the fedid experimentID out of the saved state.  This is kind
386            of a gross walk through the dict.
387            """
388
389            if state.has_key('experimentID'):
390                for e in state['experimentID']:
391                    if e.has_key('fedid'):
392                        return e['fedid']
393                else:
394                    return None
395            else:
396                return None
397
398        def get_alloc_ids(state):
399            """
400            Pull the fedids of the identifiers of each allocation from the
401            state.  Again, a dict dive that's best isolated.
402            """
403
404            return [ f['allocID']['fedid'] 
405                    for f in state.get('federant',[]) \
406                        if f.has_key('allocID') and \
407                            f['allocID'].has_key('fedid')]
408
409
410        try:
411            f = open(self.state_filename, "r")
412            self.state = pickle.load(f)
413            self.log.debug("[read_state]: Read state from %s" % \
414                    self.state_filename)
415        except IOError, e:
416            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
417                    % (self.state_filename, e))
418        except pickle.UnpicklingError, e:
419            self.log.warning(("[read_state]: No saved state: " + \
420                    "Unpickling failed: %s") % e)
421       
422        for s in self.state.values():
423            try:
424
425                eid = get_experiment_id(s)
426                if eid : 
427                    # Give the owner rights to the experiment
428                    self.auth.set_attribute(s['owner'], eid)
429                    # And holders of the eid as well
430                    self.auth.set_attribute(eid, eid)
431                    # allow overrides to control experiments as well
432                    for o in self.overrides:
433                        self.auth.set_attribute(o, eid)
434                    # Set permissions to allow reading of the software repo, if
435                    # any, as well.
436                    for a in get_alloc_ids(s):
437                        self.auth.set_attribute(a, 'repo/%s' % eid)
438                else:
439                    raise KeyError("No experiment id")
440            except KeyError, e:
441                self.log.warning("[read_state]: State ownership or identity " +\
442                        "misformatted in %s: %s" % (self.state_filename, e))
443
444
445    def read_accessdb(self, accessdb_file):
446        """
447        Read the mapping from fedids that can create experiments to their name
448        in the 3-level access namespace.  All will be asserted from this
449        testbed and can include the local username and porject that will be
450        asserted on their behalf by this fedd.  Each fedid is also added to the
451        authorization system with the "create" attribute.
452        """
453        self.accessdb = {}
454        # These are the regexps for parsing the db
455        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
456        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
457                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
458        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
459                "\s*->\s*(" + name_expr + ")\s*$")
460        lineno = 0
461
462        # Parse the mappings and store in self.authdb, a dict of
463        # fedid -> (proj, user)
464        try:
465            f = open(accessdb_file, "r")
466            for line in f:
467                lineno += 1
468                line = line.strip()
469                if len(line) == 0 or line.startswith('#'):
470                    continue
471                m = project_line.match(line)
472                if m:
473                    fid = fedid(hexstr=m.group(1))
474                    project, user = m.group(2,3)
475                    if not self.accessdb.has_key(fid):
476                        self.accessdb[fid] = []
477                    self.accessdb[fid].append((project, user))
478                    continue
479
480                m = user_line.match(line)
481                if m:
482                    fid = fedid(hexstr=m.group(1))
483                    project = None
484                    user = m.group(2)
485                    if not self.accessdb.has_key(fid):
486                        self.accessdb[fid] = []
487                    self.accessdb[fid].append((project, user))
488                    continue
489                self.log.warn("[experiment_control] Error parsing access " +\
490                        "db %s at line %d" %  (accessdb_file, lineno))
491        except IOError:
492            raise service_error(service_error.internal,
493                    "Error opening/reading %s as experiment " +\
494                            "control accessdb" %  accessdb_file)
495        f.close()
496
497        # Initialize the authorization attributes
498        for fid in self.accessdb.keys():
499            self.auth.set_attribute(fid, 'create')
500
501    def read_mapdb(self, file):
502        """
503        Read a simple colon separated list of mappings for the
504        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
505        """
506
507        self.tbmap = { }
508        lineno =0
509        try:
510            f = open(file, "r")
511            for line in f:
512                lineno += 1
513                line = line.strip()
514                if line.startswith('#') or len(line) == 0:
515                    continue
516                try:
517                    label, url = line.split(':', 1)
518                    self.tbmap[label] = url
519                except ValueError, e:
520                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
521                            "map db: %s %s" % (lineno, line, e))
522        except IOError, e:
523            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
524                    "open %s: %s" % (file, e))
525        f.close()
526       
527    def generate_ssh_keys(self, dest, type="rsa" ):
528        """
529        Generate a set of keys for the gateways to use to talk.
530
531        Keys are of type type and are stored in the required dest file.
532        """
533        valid_types = ("rsa", "dsa")
534        t = type.lower();
535        if t not in valid_types: raise ValueError
536        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
537
538        try:
539            trace = open("/dev/null", "w")
540        except IOError:
541            raise service_error(service_error.internal,
542                    "Cannot open /dev/null??");
543
544        # May raise CalledProcessError
545        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
546        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
547        if rv != 0:
548            raise service_error(service_error.internal, 
549                    "Cannot generate nonce ssh keys.  %s return code %d" \
550                            % (self.ssh_keygen, rv))
551
552    def gentopo(self, str):
553        """
554        Generate the topology dtat structure from the splitter's XML
555        representation of it.
556
557        The topology XML looks like:
558            <experiment>
559                <nodes>
560                    <node><vname></vname><ips>ip1:ip2</ips></node>
561                </nodes>
562                <lans>
563                    <lan>
564                        <vname></vname><vnode></vnode><ip></ip>
565                        <bandwidth></bandwidth><member>node:port</member>
566                    </lan>
567                </lans>
568        """
569        class topo_parse:
570            """
571            Parse the topology XML and create the dats structure.
572            """
573            def __init__(self):
574                # Typing of the subelements for data conversion
575                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
576                self.int_subelements = ( 'bandwidth',)
577                self.float_subelements = ( 'delay',)
578                # The final data structure
579                self.nodes = [ ]
580                self.lans =  [ ]
581                self.topo = { \
582                        'node': self.nodes,\
583                        'lan' : self.lans,\
584                    }
585                self.element = { }  # Current element being created
586                self.chars = ""     # Last text seen
587
588            def end_element(self, name):
589                # After each sub element the contents is added to the current
590                # element or to the appropriate list.
591                if name == 'node':
592                    self.nodes.append(self.element)
593                    self.element = { }
594                elif name == 'lan':
595                    self.lans.append(self.element)
596                    self.element = { }
597                elif name in self.str_subelements:
598                    self.element[name] = self.chars
599                    self.chars = ""
600                elif name in self.int_subelements:
601                    self.element[name] = int(self.chars)
602                    self.chars = ""
603                elif name in self.float_subelements:
604                    self.element[name] = float(self.chars)
605                    self.chars = ""
606
607            def found_chars(self, data):
608                self.chars += data.rstrip()
609
610
611        tp = topo_parse();
612        parser = xml.parsers.expat.ParserCreate()
613        parser.EndElementHandler = tp.end_element
614        parser.CharacterDataHandler = tp.found_chars
615
616        parser.Parse(str)
617
618        return tp.topo
619       
620
621    def genviz(self, topo):
622        """
623        Generate the visualization the virtual topology
624        """
625
626        neato = "/usr/local/bin/neato"
627        # These are used to parse neato output and to create the visualization
628        # file.
629        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
630        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
631                "%s</type></node>"
632
633        try:
634            # Node names
635            nodes = [ n['vname'] for n in topo['node'] ]
636            topo_lans = topo['lan']
637        except KeyError, e:
638            raise service_error(service_error.internal, "Bad topology: %s" %e)
639
640        lans = { }
641        links = { }
642
643        # Walk through the virtual topology, organizing the connections into
644        # 2-node connections (links) and more-than-2-node connections (lans).
645        # When a lan is created, it's added to the list of nodes (there's a
646        # node in the visualization for the lan).
647        for l in topo_lans:
648            if links.has_key(l['vname']):
649                if len(links[l['vname']]) < 2:
650                    links[l['vname']].append(l['vnode'])
651                else:
652                    nodes.append(l['vname'])
653                    lans[l['vname']] = links[l['vname']]
654                    del links[l['vname']]
655                    lans[l['vname']].append(l['vnode'])
656            elif lans.has_key(l['vname']):
657                lans[l['vname']].append(l['vnode'])
658            else:
659                links[l['vname']] = [ l['vnode'] ]
660
661
662        # Open up a temporary file for dot to turn into a visualization
663        try:
664            df, dotname = tempfile.mkstemp()
665            dotfile = os.fdopen(df, 'w')
666        except IOError:
667            raise service_error(service_error.internal,
668                    "Failed to open file in genviz")
669
670        try:
671            dnull = open('/dev/null', 'w')
672        except IOError:
673            service_error(service_error.internal,
674                    "Failed to open /dev/null in genviz")
675
676        # Generate a dot/neato input file from the links, nodes and lans
677        try:
678            print >>dotfile, "graph G {"
679            for n in nodes:
680                print >>dotfile, '\t"%s"' % n
681            for l in links.keys():
682                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
683            for l in lans.keys():
684                for n in lans[l]:
685                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
686            print >>dotfile, "}"
687            dotfile.close()
688        except TypeError:
689            raise service_error(service_error.internal,
690                    "Single endpoint link in vtopo")
691        except IOError:
692            raise service_error(service_error.internal, "Cannot write dot file")
693
694        # Use dot to create a visualization
695        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
696                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
697                close_fds=True)
698        dnull.close()
699
700        # Translate dot to vis format
701        vis_nodes = [ ]
702        vis = { 'node': vis_nodes }
703        for line in dot.stdout:
704            m = vis_re.match(line)
705            if m:
706                vn = m.group(1)
707                vis_node = {'name': vn, \
708                        'x': float(m.group(2)),\
709                        'y' : float(m.group(3)),\
710                    }
711                if vn in links.keys() or vn in lans.keys():
712                    vis_node['type'] = 'lan'
713                else:
714                    vis_node['type'] = 'node'
715                vis_nodes.append(vis_node)
716        rv = dot.wait()
717
718        os.remove(dotname)
719        if rv == 0 : return vis
720        else: return None
721
722    def get_access(self, tb, nodes, user, tbparam, master, export_project,
723            access_user):
724        """
725        Get access to testbed through fedd and set the parameters for that tb
726        """
727        uri = self.tbmap.get(tb, None)
728        if not uri:
729            raise service_error(serice_error.server_config, 
730                    "Unknown testbed: %s" % tb)
731
732        # currently this lumps all users into one service access group
733        service_keys = [ a for u in user \
734                for a in u.get('access', []) \
735                    if a.has_key('sshPubkey')]
736
737        if len(service_keys) == 0:
738            raise service_error(service_error.req, 
739                    "Must have at least one SSH pubkey for services")
740
741
742        for p, u in access_user:
743            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
744                    "to %s") %  ((p or "None"), u, uri))
745
746            if p:
747                # Request with user and project specified
748                req = {\
749                        'destinationTestbed' : { 'uri' : uri },
750                        'project': { 
751                            'name': {'localname': p},
752                            'user': [ {'userID': { 'localname': u } } ],
753                            },
754                        'user':  user,
755                        'allocID' : { 'localname': 'test' },
756                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
757                        'serviceAccess' : service_keys
758                    }
759            else:
760                # Request with only user specified
761                req = {\
762                        'destinationTestbed' : { 'uri' : uri },
763                        'user':  [ {'userID': { 'localname': u } } ],
764                        'allocID' : { 'localname': 'test' },
765                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
766                        'serviceAccess' : service_keys
767                    }
768
769            if tb == master:
770                # NB, the export_project parameter is a dict that includes
771                # the type
772                req['exportProject'] = export_project
773
774            # node resources if any
775            if nodes != None and len(nodes) > 0:
776                rnodes = [ ]
777                for n in nodes:
778                    rn = { }
779                    image, hw, count = n.split(":")
780                    if image: rn['image'] = [ image ]
781                    if hw: rn['hardware'] = [ hw ]
782                    if count and int(count) >0 : rn['count'] = int(count)
783                    rnodes.append(rn)
784                req['resources']= { }
785                req['resources']['node'] = rnodes
786
787            try:
788                if self.local_access.has_key(uri):
789                    # Local access call
790                    req = { 'RequestAccessRequestBody' : req }
791                    r = self.local_access[uri].RequestAccess(req, 
792                            fedid(file=self.cert_file))
793                    r = { 'RequestAccessResponseBody' : r }
794                else:
795                    r = self.call_RequestAccess(uri, req, 
796                            self.cert_file, self.cert_pwd, self.trusted_certs)
797            except service_error, e:
798                if e.code == service_error.access:
799                    self.log.debug("[get_access] Access denied")
800                    r = None
801                    continue
802                else:
803                    raise e
804
805            if r.has_key('RequestAccessResponseBody'):
806                # Through to here we have a valid response, not a fault.
807                # Access denied is a fault, so something better or worse than
808                # access denied has happened.
809                r = r['RequestAccessResponseBody']
810                self.log.debug("[get_access] Access granted")
811                break
812            else:
813                raise service_error(service_error.protocol,
814                        "Bad proxy response")
815       
816        if not r:
817            raise service_error(service_error.access, 
818                    "Access denied by %s (%s)" % (tb, uri))
819
820        if r.has_key('emulab'):
821            e = r['emulab']
822            p = e['project']
823            tbparam[tb] = { 
824                    "boss": e['boss'],
825                    "host": e['ops'],
826                    "domain": e['domain'],
827                    "fs": e['fileServer'],
828                    "eventserver": e['eventServer'],
829                    "project": unpack_id(p['name']),
830                    "emulab" : e,
831                    "allocID" : r['allocID'],
832                    "uri": uri,
833                    }
834            # Make the testbed name be the label the user applied
835            p['testbed'] = {'localname': tb }
836
837            for u in p['user']:
838                role = u.get('role', None)
839                if role == 'experimentCreation':
840                    tbparam[tb]['user'] = unpack_id(u['userID'])
841                    break
842            else:
843                raise service_error(service_error.internal, 
844                        "No createExperimentUser from %s" %tb)
845            # Add attributes to parameter space.  We don't allow attributes to
846            # overlay any parameters already installed.
847            for a in e['fedAttr']:
848                try:
849                    if a['attribute'] and \
850                            isinstance(a['attribute'], basestring)\
851                            and not tbparam[tb].has_key(a['attribute'].lower()):
852                        tbparam[tb][a['attribute'].lower()] = a['value']
853                except KeyError:
854                    self.log.error("Bad attribute in response: %s" % a)
855        else:
856            tbparam[tb] = { 
857                "allocID" : r['allocID'],
858                "uri": uri,
859            }
860
861       
862    def release_access(self, tb, aid, uri=None):
863        """
864        Release access to testbed through fedd
865        """
866
867        if not uri:
868            uri = self.tbmap.get(tb, None)
869        if not uri:
870            raise service_error(service_error.server_config, 
871                    "Unknown testbed: %s" % tb)
872
873        if self.local_access.has_key(uri):
874            resp = self.local_access[uri].ReleaseAccess(\
875                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
876                    fedid(file=self.cert_file))
877            resp = { 'ReleaseAccessResponseBody': resp } 
878        else:
879            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
880                    self.cert_file, self.cert_pwd, self.trusted_certs)
881
882        # better error coding
883
884    def remote_splitter(self, uri, desc, master):
885
886        req = {
887                'description' : { 'ns2description': desc },
888                'master': master,
889                'include_fedkit': bool(self.fedkit),
890                'include_gatewaykit': bool(self.gatewaykit)
891            }
892
893        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
894                self.trusted_certs)
895
896        if r.has_key('Ns2SplitResponseBody'):
897            r = r['Ns2SplitResponseBody']
898            if r.has_key('output'):
899                return r['output'].splitlines()
900            else:
901                raise service_error(service_error.protocol, 
902                        "Bad splitter response (no output)")
903        else:
904            raise service_error(service_error.protocol, "Bad splitter response")
905
906    class start_segment:
907        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
908                cert_pwd=None, trusted_certs=None, caller=None,
909                log_collector=None):
910            self.log = log
911            self.debug = debug
912            self.cert_file = cert_file
913            self.cert_pwd = cert_pwd
914            self.trusted_certs = None
915            self.caller = caller
916            self.testbed = testbed
917            self.log_collector = log_collector
918            self.response = None
919
920        def __call__(self, uri, aid, topo, master, attrs=None):
921            req = {
922                    'allocID': { 'fedid' : aid }, 
923                    'segmentdescription': { 
924                        'topdldescription': topo.to_dict(),
925                    },
926                    'master': master,
927                }
928            if attrs:
929                req['fedAttr'] = attrs
930
931            try:
932                self.log.debug("Calling StartSegment at %s " % uri)
933                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
934                        self.trusted_certs)
935                if r.has_key('StartSegmentResponseBody'):
936                    lval = r['StartSegmentResponseBody'].get('allocationLog',
937                            None)
938                    if lval and self.log_collector:
939                        for line in  lval.splitlines(True):
940                            self.log_collector.write(line)
941                    self.response = r
942                else:
943                    raise service_error(service_error.internal, 
944                            "Bad response!?: %s" %r)
945                return True
946            except service_error, e:
947                self.log.error("Start segment failed on %s: %s" % \
948                        (self.testbed, e))
949                return False
950
951
952
953    class terminate_segment:
954        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
955                cert_pwd=None, trusted_certs=None, caller=None):
956            self.log = log
957            self.debug = debug
958            self.cert_file = cert_file
959            self.cert_pwd = cert_pwd
960            self.trusted_certs = None
961            self.caller = caller
962            self.testbed = testbed
963
964        def __call__(self, uri, aid ):
965            req = {
966                    'allocID': aid , 
967                }
968            try:
969                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
970                        self.trusted_certs)
971                return True
972            except service_error, e:
973                self.log.error("Terminate segment failed on %s: %s" % \
974                        (self.testbed, e))
975                return False
976   
977
978    def allocate_resources(self, allocated, master, eid, expid, expcert, 
979            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
980            attrs=None):
981        def get_vlan(r):
982            if r.has_key('StartSegmentResponseBody'):
983                srb = r['StartSegmentResponseBody']
984                if srb.has_key('fedAttr'):
985                    for k, v in [ (a['attribute'], a['value']) \
986                            for a in srb['fedAttr']]:
987                        if k == 'vlan': return v
988            return None
989
990        started = { }           # Testbeds where a sub-experiment started
991                                # successfully
992
993        # XXX
994        fail_soft = False
995
996        slaves = [ k for k in allocated.keys() \
997                if k != master and not topo[k].get_attribute('transit')]
998        transit = [ k for k in allocated.keys() \
999                if topo[k].get_attribute('transit')]
1000
1001        log = alloc_log or self.log
1002
1003        thread_pool = self.thread_pool(self.nthreads)
1004        threads = [ ]
1005
1006        for tb in transit:
1007            uri = tbparams[tb]['uri']
1008            if tbparams[tb].has_key('allocID') and \
1009                    tbparams[tb]['allocID'].has_key('fedid'):
1010                aid = tbparams[tb]['allocID']['fedid']
1011            else:
1012                raise service_error(service_error.internal, 
1013                        "No alloc id for testbed %s !?" % tb)
1014
1015            m = re.search('(\d+)', tb)
1016            if m:
1017                to_repl = "unassigned%s" % m.group(1)
1018            else:
1019                raise service_error(service_error.internal, 
1020                        "Bad dynamic allocation name")
1021                break
1022
1023            ss = self.start_segment(log=log, debug=self.debug, 
1024                testbed=master, cert_file=self.cert_file, 
1025                cert_pwd=self.cert_pwd, 
1026                trusted_certs=self.trusted_certs,
1027                caller=self.call_StartSegment,
1028                log_collector=log_collector)
1029            t = self.pooled_thread(
1030                    target=ss,
1031                    args =(uri, aid, topo[tb], False, attrs),
1032                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1033            threads.append(t)
1034            t.start()
1035            # Wait until the this transit node finishes (keep pinging the log,
1036            # though)
1037
1038            mins = 0
1039            while not thread_pool.wait_for_all_done(60.0):
1040                mins += 1
1041                alloc_log.info("Waiting for master (it has been %d mins)" \
1042                        % mins)
1043
1044            if t.rv:
1045                vlan = get_vlan(ss.response)
1046                if vlan is not None:
1047                    for k, t in topo.items():
1048                        for e in t.elements:
1049                            for i in e.interface:
1050                                vl = i.get_attribute('dragon_vlan')
1051                                if vl is not None and vl == to_repl:
1052                                    i.set_attribute('dragon_vlan', vlan)
1053            else:
1054                break
1055            thread_pool.clear()
1056
1057
1058        failed = [ t.getName() for t in threads if not t.rv ]
1059
1060        if len(failed) == 0:
1061            for tb in slaves:
1062                # Create and start a thread to start the segment, and save it
1063                # to get the return value later
1064                thread_pool.wait_for_slot()
1065                uri = self.tbmap.get(tb, None)
1066                if not uri:
1067                    raise service_error(service_error.internal, 
1068                            "Unknown testbed %s !?" % tb)
1069
1070                if tbparams[tb].has_key('allocID') and \
1071                        tbparams[tb]['allocID'].has_key('fedid'):
1072                    aid = tbparams[tb]['allocID']['fedid']
1073                else:
1074                    raise service_error(service_error.internal, 
1075                            "No alloc id for testbed %s !?" % tb)
1076
1077                t  = self.pooled_thread(\
1078                        target=self.start_segment(log=log, debug=self.debug,
1079                            testbed=tb, cert_file=self.cert_file,
1080                            cert_pwd=self.cert_pwd,
1081                            trusted_certs=self.trusted_certs,
1082                            caller=self.call_StartSegment,
1083                            log_collector=log_collector), 
1084                        args=(uri, aid, topo[tb], False, attrs), name=tb,
1085                        pdata=thread_pool, trace_file=self.trace_file)
1086                threads.append(t)
1087                t.start()
1088
1089            # Wait until all finish (keep pinging the log, though)
1090            mins = 0
1091            while not thread_pool.wait_for_all_done(60.0):
1092                mins += 1
1093                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1094                        % mins)
1095
1096            thread_pool.clear()
1097
1098        # If none failed, start the master
1099        failed = [ t.getName() for t in threads if not t.rv ]
1100
1101        if len(failed) == 0:
1102            uri = self.tbmap.get(master, None)
1103            if not uri:
1104                raise service_error(service_error.internal, 
1105                        "Unknown testbed %s !?" % master)
1106
1107            if tbparams[master].has_key('allocID') and \
1108                    tbparams[master]['allocID'].has_key('fedid'):
1109                aid = tbparams[master]['allocID']['fedid']
1110            else:
1111                raise service_error(service_error.internal, 
1112                    "No alloc id for testbed %s !?" % master)
1113            t = self.pooled_thread(
1114                    target=self.start_segment(log=log, debug=self.debug, 
1115                        testbed=master, cert_file=self.cert_file, 
1116                        cert_pwd=self.cert_pwd, 
1117                        trusted_certs=self.trusted_certs,
1118                        caller=self.call_StartSegment,
1119                        log_collector=log_collector),
1120                    args =(uri, aid, topo[master], True, attrs),
1121                    name=master, pdata=thread_pool, trace_file=self.trace_file)
1122            threads.append(t)
1123            t.start()
1124            # Wait until the master finishes (keep pinging the log, though)
1125            mins = 0
1126            while not thread_pool.wait_for_all_done(60.0):
1127                mins += 1
1128                alloc_log.info("Waiting for master (it has been %d mins)" \
1129                        % mins)
1130            # update failed to include the master, if it failed
1131            failed = [ t.getName() for t in threads if not t.rv ]
1132
1133        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1134        # If one failed clean up, unless fail_soft is set
1135        if failed:
1136            if not fail_soft:
1137                thread_pool.clear()
1138                for tb in succeeded:
1139                    # Create and start a thread to stop the segment
1140                    thread_pool.wait_for_slot()
1141                    uri = self.tbmap.get(tb, None)
1142                    t  = self.pooled_thread(\
1143                            target=self.terminate_segment(log=log,
1144                                testbed=tb,
1145                                cert_file=self.cert_file, 
1146                                cert_pwd=self.cert_pwd,
1147                                trusted_certs=self.trusted_certs,
1148                                caller=self.call_TerminateSegment),
1149                            args=(uri, tbparams[tb]['federant']['allocID']),
1150                            name=tb,
1151                            pdata=thread_pool, trace_file=self.trace_file)
1152                    t.start()
1153                # Wait until all finish
1154                thread_pool.wait_for_all_done()
1155
1156                # release the allocations
1157                for tb in tbparams.keys():
1158                    self.release_access(tb, tbparams[tb]['allocID'],
1159                            tbparams[tb].get('uri', None))
1160                # Remove the placeholder
1161                self.state_lock.acquire()
1162                self.state[eid]['experimentStatus'] = 'failed'
1163                if self.state_filename: self.write_state()
1164                self.state_lock.release()
1165
1166                log.error("Swap in failed on %s" % ",".join(failed))
1167                return
1168        else:
1169            log.info("[start_segment]: Experiment %s active" % eid)
1170
1171
1172        # Walk up tmpdir, deleting as we go
1173        if self.cleanup:
1174            log.debug("[start_experiment]: removing %s" % tmpdir)
1175            for path, dirs, files in os.walk(tmpdir, topdown=False):
1176                for f in files:
1177                    os.remove(os.path.join(path, f))
1178                for d in dirs:
1179                    os.rmdir(os.path.join(path, d))
1180            os.rmdir(tmpdir)
1181        else:
1182            log.debug("[start_experiment]: not removing %s" % tmpdir)
1183
1184        # Insert the experiment into our state and update the disk copy
1185        self.state_lock.acquire()
1186        self.state[expid]['experimentStatus'] = 'active'
1187        self.state[eid] = self.state[expid]
1188        if self.state_filename: self.write_state()
1189        self.state_lock.release()
1190        return
1191
1192
1193    def add_kit(self, e, kit):
1194        """
1195        Add a Software object created from the list of (install, location)
1196        tuples passed as kit  to the software attribute of an object e.  We
1197        do this enough to break out the code, but it's kind of a hack to
1198        avoid changing the old tuple rep.
1199        """
1200
1201        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1202
1203        if isinstance(e.software, list): e.software.extend(s)
1204        else: e.software = s
1205
1206
1207    def create_experiment_state(self, fid, req, expid, expcert):
1208        """
1209        Create the initial entry in the experiment's state.  The expid and
1210        expcert are the experiment's fedid and certifacte that represents that
1211        ID, which are installed in the experiment state.  If the request
1212        includes a suggested local name that is used if possible.  If the local
1213        name is already taken by an experiment owned by this user that has
1214        failed, it is overwriutten.  Otherwise new letters are added until a
1215        valid localname is found.  The generated local name is returned.
1216        """
1217
1218        if req.has_key('experimentID') and \
1219                req['experimentID'].has_key('localname'):
1220            overwrite = False
1221            eid = req['experimentID']['localname']
1222            # If there's an old failed experiment here with the same local name
1223            # and accessible by this user, we'll overwrite it, otherwise we'll
1224            # fall through and do the collision avoidance.
1225            old_expid = self.get_experiment_fedid(eid)
1226            if old_expid and self.check_experiment_access(fid, old_expid):
1227                self.state_lock.acquire()
1228                status = self.state[eid].get('experimentStatus', None)
1229                if status and status == 'failed':
1230                    # remove the old access attribute
1231                    self.auth.unset_attribute(fid, old_expid)
1232                    overwrite = True
1233                    del self.state[eid]
1234                    del self.state[old_expid]
1235                self.state_lock.release()
1236            self.state_lock.acquire()
1237            while (self.state.has_key(eid) and not overwrite):
1238                eid += random.choice(string.ascii_letters)
1239            # Initial state
1240            self.state[eid] = {
1241                    'experimentID' : \
1242                            [ { 'localname' : eid }, {'fedid': expid } ],
1243                    'experimentStatus': 'starting',
1244                    'experimentAccess': { 'X509' : expcert },
1245                    'owner': fid,
1246                    'log' : [],
1247                }
1248            self.state[expid] = self.state[eid]
1249            if self.state_filename: self.write_state()
1250            self.state_lock.release()
1251        else:
1252            eid = self.exp_stem
1253            for i in range(0,5):
1254                eid += random.choice(string.ascii_letters)
1255            self.state_lock.acquire()
1256            while (self.state.has_key(eid)):
1257                eid = self.exp_stem
1258                for i in range(0,5):
1259                    eid += random.choice(string.ascii_letters)
1260            # Initial state
1261            self.state[eid] = {
1262                    'experimentID' : \
1263                            [ { 'localname' : eid }, {'fedid': expid } ],
1264                    'experimentStatus': 'starting',
1265                    'experimentAccess': { 'X509' : expcert },
1266                    'owner': fid,
1267                    'log' : [],
1268                }
1269            self.state[expid] = self.state[eid]
1270            if self.state_filename: self.write_state()
1271            self.state_lock.release()
1272
1273        return eid
1274
1275
1276    def allocate_ips_to_topo(self, top):
1277        """
1278        Add an ip4_address attribute to all the hosts in the topology, based on
1279        the shared substrates on which they sit.  An /etc/hosts file is also
1280        created and returned as a list of hostfiles entries.  We also return
1281        the allocator, because we may need to allocate IPs to portals
1282        (specifically DRAGON portals).
1283        """
1284        subs = sorted(top.substrates, 
1285                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1286                reverse=True)
1287        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1288        ifs = { }
1289        hosts = [ ]
1290
1291        for idx, s in enumerate(subs):
1292            a = ips.allocate(len(s.interfaces)+2)
1293            if a :
1294                base, num = a
1295                if num < len(s.interfaces) +2 : 
1296                    raise service_error(service_error.internal,
1297                            "Allocator returned wrong number of IPs??")
1298            else:
1299                raise service_error(service_error.req, 
1300                        "Cannot allocate IP addresses")
1301
1302            base += 1
1303            for i in s.interfaces:
1304                i.attribute.append(
1305                        topdl.Attribute('ip4_address', 
1306                            "%s" % ip_addr(base)))
1307                hname = i.element.name[0]
1308                if ifs.has_key(hname):
1309                    hosts.append("%s\t%s-%s %s-%d" % \
1310                            (ip_addr(base), hname, s.name, hname,
1311                                ifs[hname]))
1312                else:
1313                    ifs[hname] = 0
1314                    hosts.append("%s\t%s-%s %s-%d %s" % \
1315                            (ip_addr(base), hname, s.name, hname,
1316                                ifs[hname], hname))
1317
1318                ifs[hname] += 1
1319                base += 1
1320        return hosts, ips
1321
1322    def get_access_to_testbeds(self, testbeds, user, access_user, 
1323            export_project, master, allocated, tbparams):
1324        """
1325        Request access to the various testbeds required for this instantiation
1326        (passed in as testbeds).  User, access_user, expoert_project and master
1327        are used to construct the correct requests.  Per-testbed parameters are
1328        returned in tbparams.
1329        """
1330        for tb in testbeds:
1331            self.get_access(tb, None, user, tbparams, master,
1332                    export_project, access_user)
1333            allocated[tb] = 1
1334
1335    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1336        """
1337        Create the sub-topologies that are needed for experimetn instantiation.
1338        Along the way attach startup commands to the computers in the
1339        subtopologies.
1340        """
1341        for tb in testbeds:
1342            topo[tb] = top.clone()
1343            to_delete = [ ]
1344            for e in topo[tb].elements:
1345                etb = e.get_attribute('testbed')
1346                if etb and etb != tb:
1347                    for i in e.interface:
1348                        for s in i.subs:
1349                            try:
1350                                s.interfaces.remove(i)
1351                            except ValueError:
1352                                raise service_error(service_error.internal,
1353                                        "Can't remove interface??")
1354                    to_delete.append(e)
1355            for e in to_delete:
1356                topo[tb].elements.remove(e)
1357            topo[tb].make_indices()
1358
1359            for e in [ e for e in topo[tb].elements \
1360                    if isinstance(e,topdl.Computer)]:
1361                if tb == master:
1362                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1363                else:
1364                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1365                scmd = e.get_attribute('startup')
1366                if scmd:
1367                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1368
1369                e.set_attribute('startup', cmd)
1370                if self.fedkit: self.add_kit(e, self.fedkit)
1371
1372    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1373            portal_type, iface_desc=()):
1374        sproject = tbparams[st].get('project', 'project')
1375        dproject = tbparams[dt].get('project', 'project')
1376        mproject = tbparams[master].get('project', 'project')
1377        sdomain = tbparams[st].get('domain', ".example.com")
1378        ddomain = tbparams[dt].get('domain', ".example.com")
1379        mdomain = tbparams[master].get('domain', '.example.com')
1380        muser = tbparams[master].get('user', 'root')
1381        smbshare = tbparams[master].get('smbshare', 'USERS')
1382        aid = tbparams[dt]['allocID']['fedid']
1383        if st == master or dt == master:
1384            active = ("%s" % (st == master))
1385        else:
1386            active = ("%s" %(st > dt))
1387
1388        ifaces = [ ]
1389        for sub, attrs in iface_desc:
1390            inf = topdl.Interface(
1391                    substrate=sub,
1392                    attribute=[
1393                        topdl.Attribute(
1394                            attribute=n,
1395                            value = v)
1396                        for n, v in attrs
1397                        ]
1398                    )
1399            ifaces.append(inf)
1400        return topdl.Computer(
1401                name=myname,
1402                attribute=[ 
1403                    topdl.Attribute(attribute=n,value=v)
1404                        for n, v in (\
1405                            ('portal', 'true'),
1406                            ('domain', sdomain),
1407                            ('masterdomain', mdomain),
1408                            ('masterexperiment', "%s/%s" % \
1409                                    (mproject, eid)),
1410                            ('masteruser', muser),
1411                            ('smbshare', smbshare),
1412                            ('experiment', "%s/%s" % \
1413                                    (sproject, eid)),
1414                            ('peer', "%s" % desthost),
1415                            ('peer_segment', "%s" % aid),
1416                            ('scriptdir', 
1417                                "/usr/local/federation/bin"),
1418                            ('active', "%s" % active),
1419                            ('portal_type', portal_type), 
1420                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl >& /tmp/bridge.log'))
1421                    ],
1422                interface=ifaces,
1423                )
1424
1425    def new_portal_substrate(self, st, dt, eid, tbparams):
1426        ddomain = tbparams[dt].get('domain', ".example.com")
1427        dproject = tbparams[dt].get('project', 'project')
1428        tsubstrate = \
1429                topdl.Substrate(name='%s-%s' % (st, dt),
1430                        attribute= [
1431                            topdl.Attribute(
1432                                attribute='portal',
1433                                value='true')
1434                            ]
1435                        )
1436        segment_element = topdl.Segment(
1437                id= tbparams[dt]['allocID'],
1438                type='emulab',
1439                uri = self.tbmap.get(dt, None),
1440                interface=[ 
1441                    topdl.Interface(
1442                        substrate=tsubstrate.name),
1443                    ],
1444                attribute = [
1445                    topdl.Attribute(attribute=n, value=v)
1446                        for n, v in (\
1447                            ('domain', ddomain),
1448                            ('experiment', "%s/%s" % \
1449                                    (dproject, eid)),)
1450                    ],
1451                )
1452
1453        return (tsubstrate, segment_element)
1454
1455    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
1456        if sub.capacity is None:
1457            raise service_error(service_error.internal,
1458                    "Cannot DRAGON split substrate w/o capacity")
1459        segs = [ ]
1460        substr = topdl.Substrate(name="dragon%d" % idx, 
1461                capacity=sub.capacity.clone(),
1462                attribute=[ topdl.Attribute(attribute=n, value=v)
1463                    for n, v, in (\
1464                            ('vlan', 'unassigned%d' % idx),)])
1465        for tb in tbs.keys():
1466            seg = topdl.Segment(
1467                    id = tbparams[tb]['allocID'],
1468                    type='emulab',
1469                    uri = self.tbmap.get(tb, None),
1470                    interface=[ 
1471                        topdl.Interface(
1472                            substrate=substr.name),
1473                        ],
1474                    attribute=[ topdl.Attribute(
1475                        attribute='dragon_endpoint', 
1476                        value=tbparams[tb]['dragon']),
1477                        ]
1478                    )
1479            if tbparams[tb].has_key('vlans'):
1480                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1481            segs.append(seg)
1482
1483        topo["dragon%d" %idx] = \
1484                topdl.Topology(substrates=[substr], elements=segs,
1485                        attribute=[
1486                            topdl.Attribute(attribute="transit", value='true'),
1487                            topdl.Attribute(attribute="dynamic", value='true'),
1488                            topdl.Attribute(attribute="testbed", value='dragon'),
1489                            ]
1490                        )
1491
1492    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid):
1493        """
1494        Add attribiutes to the various elements indicating that they are to be
1495        dragon connected and create a dragon segment in tops to be
1496        instantiated.
1497        """
1498
1499        def get_substrate_from_topo(name, t):
1500            for s in t.substrates:
1501                if s.name == name: return s
1502            else: return None
1503
1504        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1505        elements = [ i.element for i in sub.interfaces ]
1506        count = { }
1507        for e in elements:
1508            tb = e.get_attribute('testbed')
1509            count[tb] = count.get(tb, 0) + 1
1510
1511        for tb in tbs.keys():
1512            s = get_substrate_from_topo(sub.name, topo[tb])
1513            if s:
1514                for i in s.interfaces:
1515                    i.set_attribute('dragon_vlan', 'unassigned%d' % dn)
1516                    if count[tb] > 1: i.set_attribute('dragon_type', 'lan')
1517                    else: i.set_attribute('dragon_type', 'link')
1518            else:
1519                raise service_error(service_error.internal,
1520                        "No substrate %s in testbed %s" % (sub.name, tb))
1521
1522        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1523
1524    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1525            segment_substrate, portals):
1526        # More than one testbed is on this substrate.  Insert
1527        # some portals into the subtopologies.  st == source testbed,
1528        # dt == destination testbed.
1529        for st in tbs.keys():
1530            if not segment_substrate.has_key(st):
1531                segment_substrate[st] = { }
1532            if not portals.has_key(st): 
1533                portals[st] = { }
1534            for dt in [ t for t in tbs.keys() if t != st]:
1535                sproject = tbparams[st].get('project', 'project')
1536                dproject = tbparams[dt].get('project', 'project')
1537                mproject = tbparams[master].get('project', 'project')
1538                sdomain = tbparams[st].get('domain', ".example.com")
1539                ddomain = tbparams[dt].get('domain', ".example.com")
1540                mdomain = tbparams[master].get('domain', '.example.com')
1541                muser = tbparams[master].get('user', 'root')
1542                smbshare = tbparams[master].get('smbshare', 'USERS')
1543                aid = tbparams[dt]['allocID']['fedid']
1544                if st == master or dt == master:
1545                    active = ("%s" % (st == master))
1546                else:
1547                    active = ("%s" %(st > dt))
1548                if not segment_substrate[st].has_key(dt):
1549                    # Put a substrate and a segment for the connected
1550                    # testbed in there.
1551                    tsubstrate, segment_element = \
1552                            self.new_portal_substrate(st, dt, eid, tbparams)
1553                    segment_substrate[st][dt] = tsubstrate
1554                    topo[st].substrates.append(tsubstrate)
1555                    topo[st].elements.append(segment_element)
1556
1557                new_portal = False
1558                if portals[st].has_key(dt):
1559                    # There's a portal set up to go to this destination.
1560                    # See if there's room to multiples this connection on
1561                    # it.  If so, add an interface to the portal; if not,
1562                    # set up to add a portal below.
1563                    # [This little festival of braces is just a pop of the
1564                    # last element in the list of portals between st and
1565                    # dt.]
1566                    portal = portals[st][dt][-1]
1567                    mux = len([ i for i in portal.interface \
1568                            if not i.get_attribute('portal')])
1569                    if mux == self.muxmax:
1570                        new_portal = True
1571                        portal_type = "experiment"
1572                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1573                        desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1574                    else:
1575                        new_i = topdl.Interface(
1576                                substrate=s.name,
1577                                attribute=[ 
1578                                    topdl.Attribute(
1579                                        attribute='ip4_address', 
1580                                        value=tbs[dt]
1581                                    )
1582                                ])
1583                        portal.interface.append(new_i)
1584                else:
1585                    # First connection to this testbed, make an empty list
1586                    # and set up to add the new portal below
1587                    new_portal = True
1588                    portals[st][dt] = [ ]
1589                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1590                    desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1591
1592                    if dt == master or st == master: portal_type = "both"
1593                    else: portal_type = "experiment"
1594
1595                if new_portal:
1596                    infs = (
1597                            (segment_substrate[st][dt].name, 
1598                                (('portal', 'true'),)),
1599                            (sub.name, 
1600                                (('ip4_address', tbs[dt]),))
1601                        )
1602                    portal =  self.new_portal_node(st, dt, tbparams, 
1603                            master, eid, myname, desthost, portal_type,
1604                            infs)
1605                    if self.fedkit:
1606                        self.add_kit(portal, self.fedkit)
1607                    if self.gatewaykit: 
1608                        self.add_kit(portal, self.gatewaykit)
1609
1610                    topo[st].elements.append(portal)
1611                    portals[st][dt].append(portal)
1612
1613    def add_control_portal(self, st, dt, master, eid, topo, tbparams):
1614        # Add to the master testbed
1615        tsubstrate, segment_element = \
1616                self.new_portal_substrate(st, dt, eid, tbparams)
1617        myname = "%stunnel" % dt
1618        desthost = "%stunnel" % st
1619
1620        portal = self.new_portal_node(st, dt, tbparams, master,
1621                eid, myname, desthost, "control", 
1622                ((tsubstrate.name,(('portal','true'),)),))
1623        if self.fedkit:
1624            self.add_kit(portal, self.fedkit)
1625        if self.gatewaykit: 
1626            self.add_kit(portal, self.gatewaykit)
1627
1628        topo[st].substrates.append(tsubstrate)
1629        topo[st].elements.append(segment_element)
1630        topo[st].elements.append(portal)
1631
1632    def new_dragon_portal(self, st, dt, master, eid, dip, idx, 
1633            substrate, tbparams):
1634        # Add to the master testbed
1635        myname = "%stunnel" % dt
1636        desthost = "%s" % ip_addr(dip)
1637
1638        portal = self.new_portal_node(st, dt, tbparams, master,
1639                eid, myname, desthost, "control", 
1640                ((substrate.name,(
1641                    ('portal','true'),
1642                    ('ip4_address', "%s" % ip_addr(dip)),
1643                    ('dragon_vlan', 'unassigned%d' % idx),
1644                    ('dragon_type', 'link'),)),))
1645        if self.fedkit:
1646            self.add_kit(portal, self.fedkit)
1647        if self.gatewaykit: 
1648            self.add_kit(portal, self.gatewaykit)
1649
1650        return portal
1651
1652    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator):
1653        """
1654        For each substrate in the main topology, find those that
1655        have nodes on more than one testbed.  Insert portal nodes
1656        into the copies of those substrates on the sub topologies.
1657        """
1658        segment_substrate = { }
1659        portals = { }
1660        for s in top.substrates:
1661            # tbs will contain an ip address on this subsrate that is in
1662            # each testbed.
1663            tbs = { }
1664            for i in s.interfaces:
1665                e = i.element
1666                tb = e.get_attribute('testbed')
1667                if tb and not tbs.has_key(tb):
1668                    for i in e.interface:
1669                        if s in i.subs:
1670                            tbs[tb]= i.get_attribute('ip4_address')
1671            if len(tbs) < 2:
1672                continue
1673
1674            # DRAGON will not create multi-site vlans yet
1675            if len(tbs) == 2 and \
1676                    all([tbparams[x].has_key('dragon') for x in tbs]):
1677                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1678                        master, eid)
1679            else:
1680                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1681                        eid, segment_substrate, portals)
1682
1683        # Make sure that all the slaves have a control portal back to the
1684        # master.
1685        for tb in [ t for t in tbparams.keys() if t != master ]:
1686            if len([e for e in topo[tb].elements \
1687                    if isinstance(e, topdl.Computer) and \
1688                    e.get_attribute('portal') and \
1689                    e.get_attribute('portal_type') == 'both']) == 0:
1690
1691                if tbparams[master].has_key('dragon') \
1692                        and tbparams[tb].has_key('dragon'):
1693
1694                    idx = len([x for x in topo.keys() \
1695                            if x.startswith('dragon')])
1696                    dip, leng = ip_allocator.allocate(4)
1697                    dip += 1
1698                    csub = topdl.Substrate(
1699                            name="dragon-control-%s" % tb,
1700                            capacity=topdl.Capacity(100000.0, 'max'),
1701                            attribute=[
1702                                topdl.Attribute(
1703                                    attribute='portal',
1704                                    value='true'
1705                                    )
1706                                ]
1707                            )
1708                    seg = topdl.Segment(
1709                            id= tbparams[master]['allocID'],
1710                            type='emulab',
1711                            uri = self.tbmap.get(master, None),
1712                            interface=[ 
1713                                topdl.Interface(
1714                                    substrate=csub.name),
1715                                ],
1716                            attribute = [
1717                                topdl.Attribute(attribute=n, value=v)
1718                                    for n, v in (\
1719                                        ('domain', 
1720                                            tbparams[master].get('domain',
1721                                                ".example.com")),
1722                                        ('experiment', "%s/%s" % \
1723                                                (tbparams[master].get(
1724                                                    'project', 
1725                                                    'project'), 
1726                                                    eid)),)
1727                                ],
1728                            )
1729                    topo[tb].substrates.append(csub)
1730                    topo[tb].elements.append(
1731                            self.new_dragon_portal(tb, master, master, eid, 
1732                                dip, idx, csub, tbparams))
1733                    topo[tb].elements.append(seg)
1734
1735                    dip+=1
1736                    mcsub = csub.clone()
1737                    seg = topdl.Segment(
1738                            id= tbparams[tb]['allocID'],
1739                            type='emulab',
1740                            uri = self.tbmap.get(tb, None),
1741                            interface=[ 
1742                                topdl.Interface(
1743                                    substrate=csub.name),
1744                                ],
1745                            attribute = [
1746                                topdl.Attribute(attribute=n, value=v)
1747                                    for n, v in (\
1748                                        ('domain', 
1749                                            tbparams[tb].get('domain',
1750                                                ".example.com")),
1751                                        ('experiment', "%s/%s" % \
1752                                                (tbparams[tb].get('project', 
1753                                                    'project'), 
1754                                                    eid)),)
1755                                ],
1756                            )
1757                    topo[master].substrates.append(mcsub)
1758                    topo[master].elements.append(
1759                            self.new_dragon_portal(master, tb, master, eid, 
1760                                dip, idx, mcsub, tbparams))
1761                    topo[master].elements.append(seg)
1762
1763                    self.create_dragon_substrate(csub, topo, 
1764                            {tb: 1, master:1}, tbparams, master, eid)
1765                else:
1766                    self.add_control_portal(master, tb, master, eid, topo, 
1767                            tbparams)
1768                    self.add_control_portal(tb, master, master, eid, topo, 
1769                            tbparams)
1770
1771        # Connect the portal nodes into the topologies and clear out
1772        # substrates that are not in the topologies
1773        for tb in tbparams.keys():
1774            topo[tb].incorporate_elements()
1775            topo[tb].substrates = \
1776                    [s for s in topo[tb].substrates \
1777                        if len(s.interfaces) >0]
1778
1779    def wrangle_software(self, expid, top, topo, tbparams):
1780        """
1781        Copy software out to the repository directory, allocate permissions and
1782        rewrite the segment topologies to look for the software in local
1783        places.
1784        """
1785
1786        # Copy the rpms and tarfiles to a distribution directory from
1787        # which the federants can retrieve them
1788        linkpath = "%s/software" %  expid
1789        softdir ="%s/%s" % ( self.repodir, linkpath)
1790        softmap = { }
1791        # These are in a list of tuples format (each kit).  This comprehension
1792        # unwraps them into a single list of tuples that initilaizes the set of
1793        # tuples.
1794        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1795                for p, t in l ])
1796        pkgs.update([x.location for e in top.elements \
1797                for x in e.software])
1798        try:
1799            os.makedirs(softdir)
1800        except IOError, e:
1801            raise service_error(
1802                    "Cannot create software directory: %s" % e)
1803        # The actual copying.  Everything's converted into a url for copying.
1804        for pkg in pkgs:
1805            loc = pkg
1806
1807            scheme, host, path = urlparse(loc)[0:3]
1808            dest = os.path.basename(path)
1809            if not scheme:
1810                if not loc.startswith('/'):
1811                    loc = "/%s" % loc
1812                loc = "file://%s" %loc
1813            try:
1814                u = urlopen(loc)
1815            except Exception, e:
1816                raise service_error(service_error.req, 
1817                        "Cannot open %s: %s" % (loc, e))
1818            try:
1819                f = open("%s/%s" % (softdir, dest) , "w")
1820                self.log.debug("Writing %s/%s" % (softdir,dest) )
1821                data = u.read(4096)
1822                while data:
1823                    f.write(data)
1824                    data = u.read(4096)
1825                f.close()
1826                u.close()
1827            except Exception, e:
1828                raise service_error(service_error.internal,
1829                        "Could not copy %s: %s" % (loc, e))
1830            path = re.sub("/tmp", "", linkpath)
1831            # XXX
1832            softmap[pkg] = \
1833                    "https://users.isi.deterlab.net:23232/%s/%s" %\
1834                    ( path, dest)
1835
1836            # Allow the individual segments to access the software.
1837            for tb in tbparams.keys():
1838                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1839                        "/%s/%s" % ( path, dest))
1840
1841        # Convert the software locations in the segments into the local
1842        # copies on this host
1843        for soft in [ s for tb in topo.values() \
1844                for e in tb.elements \
1845                    if getattr(e, 'software', False) \
1846                        for s in e.software ]:
1847            if softmap.has_key(soft.location):
1848                soft.location = softmap[soft.location]
1849
1850
1851    def create_experiment(self, req, fid):
1852        """
1853        The external interface to experiment creation called from the
1854        dispatcher.
1855
1856        Creates a working directory, splits the incoming description using the
1857        splitter script and parses out the avrious subsections using the
1858        lcasses above.  Once each sub-experiment is created, use pooled threads
1859        to instantiate them and start it all up.
1860        """
1861        if not self.auth.check_attribute(fid, 'create'):
1862            raise service_error(service_error.access, "Create access denied")
1863
1864        try:
1865            tmpdir = tempfile.mkdtemp(prefix="split-")
1866            os.mkdir(tmpdir+"/keys")
1867        except IOError:
1868            raise service_error(service_error.internal, "Cannot create tmp dir")
1869
1870        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1871        gw_secretkey_base = "fed.%s" % self.ssh_type
1872        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1873        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1874        tclfile = tmpdir + "/experiment.tcl"
1875        tbparams = { }
1876        try:
1877            access_user = self.accessdb[fid]
1878        except KeyError:
1879            raise service_error(service_error.internal,
1880                    "Access map and authorizer out of sync in " + \
1881                            "create_experiment for fedid %s"  % fid)
1882
1883        pid = "dummy"
1884        gid = "dummy"
1885
1886        req = req.get('CreateRequestBody', None)
1887        if not req:
1888            raise service_error(service_error.req,
1889                    "Bad request format (no CreateRequestBody)")
1890        # The tcl parser needs to read a file so put the content into that file
1891        descr=req.get('experimentdescription', None)
1892        if descr:
1893            file_content=descr.get('ns2description', None)
1894            if file_content:
1895                try:
1896                    f = open(tclfile, 'w')
1897                    f.write(file_content)
1898                    f.close()
1899                except IOError:
1900                    raise service_error(service_error.internal,
1901                            "Cannot write temp experiment description")
1902            else:
1903                raise service_error(service_error.req, 
1904                        "Only ns2descriptions supported")
1905        else:
1906            raise service_error(service_error.req, "No experiment description")
1907
1908        # Generate an ID for the experiment (slice) and a certificate that the
1909        # allocator can use to prove they own it.  We'll ship it back through
1910        # the encrypted connection.
1911        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1912
1913        eid = self.create_experiment_state(fid, req, expid, expcert)
1914        try: 
1915            # This catches exceptions to clear the placeholder if necessary
1916            try:
1917                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1918            except ValueError:
1919                raise service_error(service_error.server_config, 
1920                        "Bad key type (%s)" % self.ssh_type)
1921
1922            user = req.get('user', None)
1923            if user == None:
1924                raise service_error(service_error.req, "No user")
1925
1926            master = req.get('master', None)
1927            if not master:
1928                raise service_error(service_error.req,
1929                        "No master testbed label")
1930            export_project = req.get('exportProject', None)
1931            if not export_project:
1932                raise service_error(service_error.req, "No export project")
1933           
1934            # Translate to topdl
1935            if self.splitter_url:
1936                # XXX: need remote topdl translator
1937                self.log.debug("Calling remote splitter at %s" % \
1938                        self.splitter_url)
1939                split_data = self.remote_splitter(self.splitter_url,
1940                        file_content, master)
1941            else:
1942                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1943                    str(self.muxmax), '-m', master]
1944
1945                if self.fedkit:
1946                    tclcmd.append('-k')
1947
1948                if self.gatewaykit:
1949                    tclcmd.append('-K')
1950
1951                tclcmd.extend([pid, gid, eid, tclfile])
1952
1953                self.log.debug("running local splitter %s", " ".join(tclcmd))
1954                # This is just fantastic.  As a side effect the parser copies
1955                # tb_compat.tcl into the current directory, so that directory
1956                # must be writable by the fedd user.  Doing this in the
1957                # temporary subdir ensures this is the case.
1958                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1959                        cwd=tmpdir)
1960                split_data = tclparser.stdout
1961
1962            top = topdl.topology_from_xml(file=split_data, top="experiment")
1963
1964            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1965             # Find the testbeds to look up
1966            testbeds = set([ a.value for e in top.elements \
1967                    for a in e.attribute \
1968                    if a.attribute == 'testbed'] )
1969
1970            allocated = { }         # Testbeds we can access
1971            topo ={ }               # Sub topologies
1972            self.get_access_to_testbeds(testbeds, user, access_user, 
1973                    export_project, master, allocated, tbparams)
1974            self.split_topology(top, topo, testbeds, eid, master, tbparams)
1975
1976            # Copy configuration files into the remote file store
1977            # The config urlpath
1978            configpath = "/%s/config" % expid
1979            # The config file system location
1980            configdir ="%s%s" % ( self.repodir, configpath)
1981            try:
1982                os.makedirs(configdir)
1983            except IOError, e:
1984                raise service_error(
1985                        "Cannot create config directory: %s" % e)
1986            try:
1987                f = open("%s/hosts" % configdir, "w")
1988                f.write('\n'.join(hosts))
1989                f.close()
1990            except IOError, e:
1991                raise service_error(service_error.internal, 
1992                        "Cannot write hosts file: %s" % e)
1993            try:
1994                copy_file("%s" % gw_pubkey, "%s/%s" % \
1995                        (configdir, gw_pubkey_base))
1996                copy_file("%s" % gw_secretkey, "%s/%s" % \
1997                        (configdir, gw_secretkey_base))
1998            except IOError, e:
1999                raise service_error(service_error.internal, 
2000                        "Cannot copy keyfiles: %s" % e)
2001
2002            # Allow the individual testbeds to access the configuration files.
2003            for tb in tbparams.keys():
2004                asignee = tbparams[tb]['allocID']['fedid']
2005                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2006                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2007
2008            self.add_portals(top, topo, eid, master, tbparams, ip_allocator)
2009            # Now get access to the dynamic testbeds
2010            for k, t in topo.items():
2011                if not t.get_attribute('dynamic'):
2012                    continue
2013                tb = t.get_attribute('testbed')
2014                if tb: 
2015                    self.get_access(tb, None, user, tbparams, master, 
2016                            export_project, access_user)
2017                    tbparams[k] = tbparams[tb]
2018                    del tbparams[tb]
2019                    allocated[k] = 1
2020                else:
2021                    raise service_error(service_error.internal, 
2022                            "Dynamic allocation from no testbed!?")
2023
2024            self.wrangle_software(expid, top, topo, tbparams)
2025
2026            vtopo = topdl.topology_to_vtopo(top)
2027            vis = self.genviz(vtopo)
2028
2029            # save federant information
2030            for k in allocated.keys():
2031                tbparams[k]['federant'] = {
2032                        'name': [ { 'localname' : eid} ],
2033                        'allocID' : tbparams[k]['allocID'],
2034                        'master' : k == master,
2035                        'uri': tbparams[k]['uri'],
2036                    }
2037                if tbparams[k].has_key('emulab'):
2038                        tbparams[k]['federant']['emulab'] = \
2039                                tbparams[k]['emulab']
2040
2041            self.state_lock.acquire()
2042            self.state[eid]['vtopo'] = vtopo
2043            self.state[eid]['vis'] = vis
2044            self.state[expid]['federant'] = \
2045                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2046                        if tbparams[tb].has_key('federant') ]
2047            if self.state_filename: 
2048                self.write_state()
2049            self.state_lock.release()
2050        except service_error, e:
2051            # If something goes wrong in the parse (usually an access error)
2052            # clear the placeholder state.  From here on out the code delays
2053            # exceptions.  Failing at this point returns a fault to the remote
2054            # caller.
2055
2056            self.state_lock.acquire()
2057            del self.state[eid]
2058            del self.state[expid]
2059            if self.state_filename: self.write_state()
2060            self.state_lock.release()
2061            raise e
2062
2063
2064        # Start the background swapper and return the starting state.  From
2065        # here on out, the state will stick around a while.
2066
2067        # Let users touch the state
2068        self.auth.set_attribute(fid, expid)
2069        self.auth.set_attribute(expid, expid)
2070        # Override fedids can manipulate state as well
2071        for o in self.overrides:
2072            self.auth.set_attribute(o, expid)
2073
2074        # Create a logger that logs to the experiment's state object as well as
2075        # to the main log file.
2076        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2077        alloc_collector = self.list_log(self.state[eid]['log'])
2078        h = logging.StreamHandler(alloc_collector)
2079        # XXX: there should be a global one of these rather than repeating the
2080        # code.
2081        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2082                    '%d %b %y %H:%M:%S'))
2083        alloc_log.addHandler(h)
2084       
2085        # XXX
2086        url_base = 'https://users.isi.deterlab.net:23232'
2087        attrs = [ 
2088                {
2089                    'attribute': 'ssh_pubkey', 
2090                    'value': '%s/%s/config/%s' % \
2091                            (url_base, expid, gw_pubkey_base)
2092                },
2093                {
2094                    'attribute': 'ssh_secretkey', 
2095                    'value': '%s/%s/config/%s' % \
2096                            (url_base, expid, gw_secretkey_base)
2097                },
2098                {
2099                    'attribute': 'hosts', 
2100                    'value': '%s/%s/config/hosts' % \
2101                            (url_base, expid)
2102                },
2103                {
2104                    'attribute': 'experiment_name',
2105                    'value': eid,
2106                },
2107            ]
2108
2109        # Start a thread to do the resource allocation
2110        t  = Thread(target=self.allocate_resources,
2111                args=(allocated, master, eid, expid, expcert, tbparams, 
2112                    topo, tmpdir, alloc_log, alloc_collector, attrs),
2113                name=eid)
2114        t.start()
2115
2116        rv = {
2117                'experimentID': [
2118                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2119                ],
2120                'experimentStatus': 'starting',
2121                'experimentAccess': { 'X509' : expcert }
2122            }
2123
2124        return rv
2125   
2126    def get_experiment_fedid(self, key):
2127        """
2128        find the fedid associated with the localname key in the state database.
2129        """
2130
2131        rv = None
2132        self.state_lock.acquire()
2133        if self.state.has_key(key):
2134            if isinstance(self.state[key], dict):
2135                try:
2136                    kl = [ f['fedid'] for f in \
2137                            self.state[key]['experimentID']\
2138                                if f.has_key('fedid') ]
2139                except KeyError:
2140                    self.state_lock.release()
2141                    raise service_error(service_error.internal, 
2142                            "No fedid for experiment %s when getting "+\
2143                                    "fedid(!?)" % key)
2144                if len(kl) == 1:
2145                    rv = kl[0]
2146                else:
2147                    self.state_lock.release()
2148                    raise service_error(service_error.internal, 
2149                            "multiple fedids for experiment %s when " +\
2150                                    "getting fedid(!?)" % key)
2151            else:
2152                self.state_lock.release()
2153                raise service_error(service_error.internal, 
2154                        "Unexpected state for %s" % key)
2155        self.state_lock.release()
2156        return rv
2157
2158    def check_experiment_access(self, fid, key):
2159        """
2160        Confirm that the fid has access to the experiment.  Though a request
2161        may be made in terms of a local name, the access attribute is always
2162        the experiment's fedid.
2163        """
2164        if not isinstance(key, fedid):
2165            key = self.get_experiment_fedid(key)
2166
2167        if self.auth.check_attribute(fid, key):
2168            return True
2169        else:
2170            raise service_error(service_error.access, "Access Denied")
2171
2172
2173    def get_handler(self, path, fid):
2174        if self.auth.check_attribute(fid, path):
2175            return ("%s/%s" % (self.repodir, path), "application/binary")
2176        else:
2177            return (None, None)
2178
2179    def get_vtopo(self, req, fid):
2180        """
2181        Return the stored virtual topology for this experiment
2182        """
2183        rv = None
2184        state = None
2185
2186        req = req.get('VtopoRequestBody', None)
2187        if not req:
2188            raise service_error(service_error.req,
2189                    "Bad request format (no VtopoRequestBody)")
2190        exp = req.get('experiment', None)
2191        if exp:
2192            if exp.has_key('fedid'):
2193                key = exp['fedid']
2194                keytype = "fedid"
2195            elif exp.has_key('localname'):
2196                key = exp['localname']
2197                keytype = "localname"
2198            else:
2199                raise service_error(service_error.req, "Unknown lookup type")
2200        else:
2201            raise service_error(service_error.req, "No request?")
2202
2203        self.check_experiment_access(fid, key)
2204
2205        self.state_lock.acquire()
2206        if self.state.has_key(key):
2207            if self.state[key].has_key('vtopo'):
2208                rv = { 'experiment' : {keytype: key },\
2209                        'vtopo': self.state[key]['vtopo'],\
2210                    }
2211            else:
2212                state = self.state[key]['experimentStatus']
2213        self.state_lock.release()
2214
2215        if rv: return rv
2216        else: 
2217            if state:
2218                raise service_error(service_error.partial, 
2219                        "Not ready: %s" % state)
2220            else:
2221                raise service_error(service_error.req, "No such experiment")
2222
2223    def get_vis(self, req, fid):
2224        """
2225        Return the stored visualization for this experiment
2226        """
2227        rv = None
2228        state = None
2229
2230        req = req.get('VisRequestBody', None)
2231        if not req:
2232            raise service_error(service_error.req,
2233                    "Bad request format (no VisRequestBody)")
2234        exp = req.get('experiment', None)
2235        if exp:
2236            if exp.has_key('fedid'):
2237                key = exp['fedid']
2238                keytype = "fedid"
2239            elif exp.has_key('localname'):
2240                key = exp['localname']
2241                keytype = "localname"
2242            else:
2243                raise service_error(service_error.req, "Unknown lookup type")
2244        else:
2245            raise service_error(service_error.req, "No request?")
2246
2247        self.check_experiment_access(fid, key)
2248
2249        self.state_lock.acquire()
2250        if self.state.has_key(key):
2251            if self.state[key].has_key('vis'):
2252                rv =  { 'experiment' : {keytype: key },\
2253                        'vis': self.state[key]['vis'],\
2254                        }
2255            else:
2256                state = self.state[key]['experimentStatus']
2257        self.state_lock.release()
2258
2259        if rv: return rv
2260        else:
2261            if state:
2262                raise service_error(service_error.partial, 
2263                        "Not ready: %s" % state)
2264            else:
2265                raise service_error(service_error.req, "No such experiment")
2266
2267    def clean_info_response(self, rv):
2268        """
2269        Remove the information in the experiment's state object that is not in
2270        the info response.
2271        """
2272        # Remove the owner info (should always be there, but...)
2273        if rv.has_key('owner'): del rv['owner']
2274
2275        # Convert the log into the allocationLog parameter and remove the
2276        # log entry (with defensive programming)
2277        if rv.has_key('log'):
2278            rv['allocationLog'] = "".join(rv['log'])
2279            del rv['log']
2280        else:
2281            rv['allocationLog'] = ""
2282
2283        if rv['experimentStatus'] != 'active':
2284            if rv.has_key('federant'): del rv['federant']
2285        else:
2286            # remove the allocationID and uri info from each federant
2287            for f in rv.get('federant', []):
2288                if f.has_key('allocID'): del f['allocID']
2289                if f.has_key('uri'): del f['uri']
2290        return rv
2291
2292    def get_info(self, req, fid):
2293        """
2294        Return all the stored info about this experiment
2295        """
2296        rv = None
2297
2298        req = req.get('InfoRequestBody', None)
2299        if not req:
2300            raise service_error(service_error.req,
2301                    "Bad request format (no InfoRequestBody)")
2302        exp = req.get('experiment', None)
2303        if exp:
2304            if exp.has_key('fedid'):
2305                key = exp['fedid']
2306                keytype = "fedid"
2307            elif exp.has_key('localname'):
2308                key = exp['localname']
2309                keytype = "localname"
2310            else:
2311                raise service_error(service_error.req, "Unknown lookup type")
2312        else:
2313            raise service_error(service_error.req, "No request?")
2314
2315        self.check_experiment_access(fid, key)
2316
2317        # The state may be massaged by the service function that called
2318        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2319        # state.
2320        self.state_lock.acquire()
2321        if self.state.has_key(key):
2322            rv = copy.deepcopy(self.state[key])
2323        self.state_lock.release()
2324
2325        if rv:
2326            return self.clean_info_response(rv)
2327        else:
2328            raise service_error(service_error.req, "No such experiment")
2329
2330    def get_multi_info(self, req, fid):
2331        """
2332        Return all the stored info that this fedid can access
2333        """
2334        rv = { 'info': [ ] }
2335
2336        self.state_lock.acquire()
2337        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2338            self.check_experiment_access(fid, key)
2339
2340            if self.state.has_key(key):
2341                e = copy.deepcopy(self.state[key])
2342                e = self.clean_info_response(e)
2343                rv['info'].append(e)
2344        self.state_lock.release()
2345        return rv
2346
2347    def terminate_experiment(self, req, fid):
2348        """
2349        Swap this experiment out on the federants and delete the shared
2350        information
2351        """
2352        tbparams = { }
2353        req = req.get('TerminateRequestBody', None)
2354        if not req:
2355            raise service_error(service_error.req,
2356                    "Bad request format (no TerminateRequestBody)")
2357        force = req.get('force', False)
2358        exp = req.get('experiment', None)
2359        if exp:
2360            if exp.has_key('fedid'):
2361                key = exp['fedid']
2362                keytype = "fedid"
2363            elif exp.has_key('localname'):
2364                key = exp['localname']
2365                keytype = "localname"
2366            else:
2367                raise service_error(service_error.req, "Unknown lookup type")
2368        else:
2369            raise service_error(service_error.req, "No request?")
2370
2371        self.check_experiment_access(fid, key)
2372
2373        dealloc_list = [ ]
2374
2375
2376        # Create a logger that logs to the dealloc_list as well as to the main
2377        # log file.
2378        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2379        h = logging.StreamHandler(self.list_log(dealloc_list))
2380        # XXX: there should be a global one of these rather than repeating the
2381        # code.
2382        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2383                    '%d %b %y %H:%M:%S'))
2384        dealloc_log.addHandler(h)
2385
2386        self.state_lock.acquire()
2387        fed_exp = self.state.get(key, None)
2388
2389        if fed_exp:
2390            # This branch of the conditional holds the lock to generate a
2391            # consistent temporary tbparams variable to deallocate experiments.
2392            # It releases the lock to do the deallocations and reacquires it to
2393            # remove the experiment state when the termination is complete.
2394
2395            # First make sure that the experiment creation is complete.
2396            status = fed_exp.get('experimentStatus', None)
2397
2398            if status:
2399                if status in ('starting', 'terminating'):
2400                    if not force:
2401                        self.state_lock.release()
2402                        raise service_error(service_error.partial, 
2403                                'Experiment still being created or destroyed')
2404                    else:
2405                        self.log.warning('Experiment in %s state ' % status + \
2406                                'being terminated by force.')
2407            else:
2408                # No status??? trouble
2409                self.state_lock.release()
2410                raise service_error(service_error.internal,
2411                        "Experiment has no status!?")
2412
2413            ids = []
2414            #  experimentID is a list of dicts that are self-describing
2415            #  identifiers.  This finds all the fedids and localnames - the
2416            #  keys of self.state - and puts them into ids.
2417            for id in fed_exp.get('experimentID', []):
2418                if id.has_key('fedid'): ids.append(id['fedid'])
2419                if id.has_key('localname'): ids.append(id['localname'])
2420
2421            # Collect the allocation/segment ids
2422            for fed in fed_exp.get('federant', []):
2423                try:
2424                    tb = fed['uri']
2425                    aid = fed['allocID']
2426                except KeyError, e:
2427                    continue
2428                tbparams[tb] = aid
2429            fed_exp['experimentStatus'] = 'terminating'
2430            if self.state_filename: self.write_state()
2431            self.state_lock.release()
2432
2433            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2434            # then completes, so we can't wait if nothing starts.  So, no
2435            # tbparams, no start.
2436            if len(tbparams) > 0:
2437                thread_pool = self.thread_pool(self.nthreads)
2438                for tb in tbparams.keys():
2439                    # Create and start a thread to stop the segment
2440                    thread_pool.wait_for_slot()
2441                    #uri = self.tbmap.get(tb, None)
2442                    uri = tb
2443                    t  = self.pooled_thread(\
2444                            target=self.terminate_segment(log=dealloc_log,
2445                                testbed=tb,
2446                                cert_file=self.cert_file, 
2447                                cert_pwd=self.cert_pwd,
2448                                trusted_certs=self.trusted_certs,
2449                                caller=self.call_TerminateSegment),
2450                            args=(uri, tbparams[tb]), name=tb,
2451                            pdata=thread_pool, trace_file=self.trace_file)
2452                    t.start()
2453                # Wait for completions
2454                thread_pool.wait_for_all_done()
2455
2456            # release the allocations (failed experiments have done this
2457            # already, and starting experiments may be in odd states, so we
2458            # ignore errors releasing those allocations
2459            try: 
2460                for tb in tbparams.keys():
2461                    # This releases access by uri
2462                    self.release_access(None, tbparams[tb], uri=tb)
2463            except service_error, e:
2464                if status != 'failed' and not force:
2465                    raise e
2466
2467            # Remove the terminated experiment
2468            self.state_lock.acquire()
2469            for id in ids:
2470                if self.state.has_key(id): del self.state[id]
2471
2472            if self.state_filename: self.write_state()
2473            self.state_lock.release()
2474
2475            return { 
2476                    'experiment': exp , 
2477                    'deallocationLog': "".join(dealloc_list),
2478                    }
2479        else:
2480            # Don't forget to release the lock
2481            self.state_lock.release()
2482            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.