source: fedd/federation/experiment_control.py @ 5b74b63

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 5b74b63 was 5b74b63, checked in by Ted Faber <faber@…>, 15 years ago

Initial commit of support for transit connection info

  • Property mode set to 100644
File size: 87.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221        self.repo_url = config.get("experiment_control", "repo_url", 
222                "https://users.isi.deterlab.net:23235");
223
224        self.exp_stem = "fed-stem"
225        self.log = logging.getLogger("fedd.experiment_control")
226        set_log_level(config, "experiment_control", self.log)
227        self.muxmax = 2
228        self.nthreads = 2
229        self.randomize_experiments = False
230
231        self.splitter = None
232        self.ssh_keygen = "/usr/bin/ssh-keygen"
233        self.ssh_identity_file = None
234
235
236        self.debug = config.getboolean("experiment_control", "create_debug")
237        self.cleanup = not config.getboolean("experiment_control", 
238                "leave_tmpfiles")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'New': soap_handler('New', self.new_experiment),
337                'Create': soap_handler('Create', self.create_experiment),
338                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
339                'Vis': soap_handler('Vis', self.get_vis),
340                'Info': soap_handler('Info', self.get_info),
341                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
342                'Terminate': soap_handler('Terminate', 
343                    self.terminate_experiment),
344        }
345
346        self.xmlrpc_services = {\
347                'New': xmlrpc_handler('New', self.new_experiment),
348                'Create': xmlrpc_handler('Create', self.create_experiment),
349                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
350                'Vis': xmlrpc_handler('Vis', self.get_vis),
351                'Info': xmlrpc_handler('Info', self.get_info),
352                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
353                'Terminate': xmlrpc_handler('Terminate',
354                    self.terminate_experiment),
355        }
356
357    # Call while holding self.state_lock
358    def write_state(self):
359        """
360        Write a new copy of experiment state after copying the existing state
361        to a backup.
362
363        State format is a simple pickling of the state dictionary.
364        """
365        if os.access(self.state_filename, os.W_OK):
366            copy_file(self.state_filename, \
367                    "%s.bak" % self.state_filename)
368        try:
369            f = open(self.state_filename, 'w')
370            pickle.dump(self.state, f)
371        except IOError, e:
372            self.log.error("Can't write file %s: %s" % \
373                    (self.state_filename, e))
374        except pickle.PicklingError, e:
375            self.log.error("Pickling problem: %s" % e)
376        except TypeError, e:
377            self.log.error("Pickling problem (TypeError): %s" % e)
378
379    # Call while holding self.state_lock
380    def read_state(self):
381        """
382        Read a new copy of experiment state.  Old state is overwritten.
383
384        State format is a simple pickling of the state dictionary.
385        """
386       
387        def get_experiment_id(state):
388            """
389            Pull the fedid experimentID out of the saved state.  This is kind
390            of a gross walk through the dict.
391            """
392
393            if state.has_key('experimentID'):
394                for e in state['experimentID']:
395                    if e.has_key('fedid'):
396                        return e['fedid']
397                else:
398                    return None
399            else:
400                return None
401
402        def get_alloc_ids(state):
403            """
404            Pull the fedids of the identifiers of each allocation from the
405            state.  Again, a dict dive that's best isolated.
406            """
407
408            return [ f['allocID']['fedid'] 
409                    for f in state.get('federant',[]) \
410                        if f.has_key('allocID') and \
411                            f['allocID'].has_key('fedid')]
412
413
414        try:
415            f = open(self.state_filename, "r")
416            self.state = pickle.load(f)
417            self.log.debug("[read_state]: Read state from %s" % \
418                    self.state_filename)
419        except IOError, e:
420            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
421                    % (self.state_filename, e))
422        except pickle.UnpicklingError, e:
423            self.log.warning(("[read_state]: No saved state: " + \
424                    "Unpickling failed: %s") % e)
425       
426        for s in self.state.values():
427            try:
428
429                eid = get_experiment_id(s)
430                if eid : 
431                    # Give the owner rights to the experiment
432                    self.auth.set_attribute(s['owner'], eid)
433                    # And holders of the eid as well
434                    self.auth.set_attribute(eid, eid)
435                    # allow overrides to control experiments as well
436                    for o in self.overrides:
437                        self.auth.set_attribute(o, eid)
438                    # Set permissions to allow reading of the software repo, if
439                    # any, as well.
440                    for a in get_alloc_ids(s):
441                        self.auth.set_attribute(a, 'repo/%s' % eid)
442                else:
443                    raise KeyError("No experiment id")
444            except KeyError, e:
445                self.log.warning("[read_state]: State ownership or identity " +\
446                        "misformatted in %s: %s" % (self.state_filename, e))
447
448
449    def read_accessdb(self, accessdb_file):
450        """
451        Read the mapping from fedids that can create experiments to their name
452        in the 3-level access namespace.  All will be asserted from this
453        testbed and can include the local username and porject that will be
454        asserted on their behalf by this fedd.  Each fedid is also added to the
455        authorization system with the "create" attribute.
456        """
457        self.accessdb = {}
458        # These are the regexps for parsing the db
459        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
460        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
461                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
462        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
463                "\s*->\s*(" + name_expr + ")\s*$")
464        lineno = 0
465
466        # Parse the mappings and store in self.authdb, a dict of
467        # fedid -> (proj, user)
468        try:
469            f = open(accessdb_file, "r")
470            for line in f:
471                lineno += 1
472                line = line.strip()
473                if len(line) == 0 or line.startswith('#'):
474                    continue
475                m = project_line.match(line)
476                if m:
477                    fid = fedid(hexstr=m.group(1))
478                    project, user = m.group(2,3)
479                    if not self.accessdb.has_key(fid):
480                        self.accessdb[fid] = []
481                    self.accessdb[fid].append((project, user))
482                    continue
483
484                m = user_line.match(line)
485                if m:
486                    fid = fedid(hexstr=m.group(1))
487                    project = None
488                    user = m.group(2)
489                    if not self.accessdb.has_key(fid):
490                        self.accessdb[fid] = []
491                    self.accessdb[fid].append((project, user))
492                    continue
493                self.log.warn("[experiment_control] Error parsing access " +\
494                        "db %s at line %d" %  (accessdb_file, lineno))
495        except IOError:
496            raise service_error(service_error.internal,
497                    "Error opening/reading %s as experiment " +\
498                            "control accessdb" %  accessdb_file)
499        f.close()
500
501        # Initialize the authorization attributes
502        for fid in self.accessdb.keys():
503            self.auth.set_attribute(fid, 'create')
504            self.auth.set_attribute(fid, 'new')
505
506    def read_mapdb(self, file):
507        """
508        Read a simple colon separated list of mappings for the
509        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
510        """
511
512        self.tbmap = { }
513        lineno =0
514        try:
515            f = open(file, "r")
516            for line in f:
517                lineno += 1
518                line = line.strip()
519                if line.startswith('#') or len(line) == 0:
520                    continue
521                try:
522                    label, url = line.split(':', 1)
523                    self.tbmap[label] = url
524                except ValueError, e:
525                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
526                            "map db: %s %s" % (lineno, line, e))
527        except IOError, e:
528            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
529                    "open %s: %s" % (file, e))
530        f.close()
531       
532    def generate_ssh_keys(self, dest, type="rsa" ):
533        """
534        Generate a set of keys for the gateways to use to talk.
535
536        Keys are of type type and are stored in the required dest file.
537        """
538        valid_types = ("rsa", "dsa")
539        t = type.lower();
540        if t not in valid_types: raise ValueError
541        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
542
543        try:
544            trace = open("/dev/null", "w")
545        except IOError:
546            raise service_error(service_error.internal,
547                    "Cannot open /dev/null??");
548
549        # May raise CalledProcessError
550        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
551        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
552        if rv != 0:
553            raise service_error(service_error.internal, 
554                    "Cannot generate nonce ssh keys.  %s return code %d" \
555                            % (self.ssh_keygen, rv))
556
557    def gentopo(self, str):
558        """
559        Generate the topology dtat structure from the splitter's XML
560        representation of it.
561
562        The topology XML looks like:
563            <experiment>
564                <nodes>
565                    <node><vname></vname><ips>ip1:ip2</ips></node>
566                </nodes>
567                <lans>
568                    <lan>
569                        <vname></vname><vnode></vnode><ip></ip>
570                        <bandwidth></bandwidth><member>node:port</member>
571                    </lan>
572                </lans>
573        """
574        class topo_parse:
575            """
576            Parse the topology XML and create the dats structure.
577            """
578            def __init__(self):
579                # Typing of the subelements for data conversion
580                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
581                self.int_subelements = ( 'bandwidth',)
582                self.float_subelements = ( 'delay',)
583                # The final data structure
584                self.nodes = [ ]
585                self.lans =  [ ]
586                self.topo = { \
587                        'node': self.nodes,\
588                        'lan' : self.lans,\
589                    }
590                self.element = { }  # Current element being created
591                self.chars = ""     # Last text seen
592
593            def end_element(self, name):
594                # After each sub element the contents is added to the current
595                # element or to the appropriate list.
596                if name == 'node':
597                    self.nodes.append(self.element)
598                    self.element = { }
599                elif name == 'lan':
600                    self.lans.append(self.element)
601                    self.element = { }
602                elif name in self.str_subelements:
603                    self.element[name] = self.chars
604                    self.chars = ""
605                elif name in self.int_subelements:
606                    self.element[name] = int(self.chars)
607                    self.chars = ""
608                elif name in self.float_subelements:
609                    self.element[name] = float(self.chars)
610                    self.chars = ""
611
612            def found_chars(self, data):
613                self.chars += data.rstrip()
614
615
616        tp = topo_parse();
617        parser = xml.parsers.expat.ParserCreate()
618        parser.EndElementHandler = tp.end_element
619        parser.CharacterDataHandler = tp.found_chars
620
621        parser.Parse(str)
622
623        return tp.topo
624       
625
626    def genviz(self, topo):
627        """
628        Generate the visualization the virtual topology
629        """
630
631        neato = "/usr/local/bin/neato"
632        # These are used to parse neato output and to create the visualization
633        # file.
634        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
635        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
636                "%s</type></node>"
637
638        try:
639            # Node names
640            nodes = [ n['vname'] for n in topo['node'] ]
641            topo_lans = topo['lan']
642        except KeyError, e:
643            raise service_error(service_error.internal, "Bad topology: %s" %e)
644
645        lans = { }
646        links = { }
647
648        # Walk through the virtual topology, organizing the connections into
649        # 2-node connections (links) and more-than-2-node connections (lans).
650        # When a lan is created, it's added to the list of nodes (there's a
651        # node in the visualization for the lan).
652        for l in topo_lans:
653            if links.has_key(l['vname']):
654                if len(links[l['vname']]) < 2:
655                    links[l['vname']].append(l['vnode'])
656                else:
657                    nodes.append(l['vname'])
658                    lans[l['vname']] = links[l['vname']]
659                    del links[l['vname']]
660                    lans[l['vname']].append(l['vnode'])
661            elif lans.has_key(l['vname']):
662                lans[l['vname']].append(l['vnode'])
663            else:
664                links[l['vname']] = [ l['vnode'] ]
665
666
667        # Open up a temporary file for dot to turn into a visualization
668        try:
669            df, dotname = tempfile.mkstemp()
670            dotfile = os.fdopen(df, 'w')
671        except IOError:
672            raise service_error(service_error.internal,
673                    "Failed to open file in genviz")
674
675        try:
676            dnull = open('/dev/null', 'w')
677        except IOError:
678            service_error(service_error.internal,
679                    "Failed to open /dev/null in genviz")
680
681        # Generate a dot/neato input file from the links, nodes and lans
682        try:
683            print >>dotfile, "graph G {"
684            for n in nodes:
685                print >>dotfile, '\t"%s"' % n
686            for l in links.keys():
687                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
688            for l in lans.keys():
689                for n in lans[l]:
690                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
691            print >>dotfile, "}"
692            dotfile.close()
693        except TypeError:
694            raise service_error(service_error.internal,
695                    "Single endpoint link in vtopo")
696        except IOError:
697            raise service_error(service_error.internal, "Cannot write dot file")
698
699        # Use dot to create a visualization
700        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
701                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
702                close_fds=True)
703        dnull.close()
704
705        # Translate dot to vis format
706        vis_nodes = [ ]
707        vis = { 'node': vis_nodes }
708        for line in dot.stdout:
709            m = vis_re.match(line)
710            if m:
711                vn = m.group(1)
712                vis_node = {'name': vn, \
713                        'x': float(m.group(2)),\
714                        'y' : float(m.group(3)),\
715                    }
716                if vn in links.keys() or vn in lans.keys():
717                    vis_node['type'] = 'lan'
718                else:
719                    vis_node['type'] = 'node'
720                vis_nodes.append(vis_node)
721        rv = dot.wait()
722
723        os.remove(dotname)
724        if rv == 0 : return vis
725        else: return None
726
727    def get_access(self, tb, nodes, tbparam, master, export_project,
728            access_user, services):
729        """
730        Get access to testbed through fedd and set the parameters for that tb
731        """
732        uri = self.tbmap.get(tb, None)
733        if not uri:
734            raise service_error(serice_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        # Tweak search order so that if there are entries in access_user that
738        # have a project matching the export project, we try them first
739        if export_project and export_project.has_key('localname'): 
740            pn = export_project['localname'] 
741               
742            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
743            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
744        else: 
745            access_sequence = access_user
746
747        for p, u in access_sequence: 
748            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
749                    "to %s") %  ((p or "None"), u, uri))
750
751            if p:
752                # Request with user and project specified
753                req = {\
754                        'destinationTestbed' : { 'uri' : uri },
755                        'credential': [ "project: %s" % p, "user: %s"  % u],
756                        'allocID' : { 'localname': 'test' },
757                    }
758            else:
759                # Request with only user specified
760                req = {\
761                        'destinationTestbed' : { 'uri' : uri },
762                        'credential': [ 'user: %s' % u ],
763                        'user':  [ {'userID': { 'localname': u } } ],
764                        'allocID' : { 'localname': 'test' },
765                    }
766
767            if tb == master:
768                # NB, the export_project parameter is a dict that includes
769                # the type
770                req['exportProject'] = export_project
771                req['service'] = [
772                        { 'name': 'userconfig', 'visibility': 'export'},
773                        { 'name': 'SMB', 'visibility': 'export'},
774                        { 'name': 'seer', 'visibility': 'export'},
775                        { 'name': 'tmcd', 'visibility': 'export'},
776                    ]
777
778            # node resources if any
779            if nodes != None and len(nodes) > 0:
780                rnodes = [ ]
781                for n in nodes:
782                    rn = { }
783                    image, hw, count = n.split(":")
784                    if image: rn['image'] = [ image ]
785                    if hw: rn['hardware'] = [ hw ]
786                    if count and int(count) >0 : rn['count'] = int(count)
787                    rnodes.append(rn)
788                req['resources']= { }
789                req['resources']['node'] = rnodes
790
791            try:
792                if self.local_access.has_key(uri):
793                    # Local access call
794                    req = { 'RequestAccessRequestBody' : req }
795                    r = self.local_access[uri].RequestAccess(req, 
796                            fedid(file=self.cert_file))
797                    r = { 'RequestAccessResponseBody' : r }
798                else:
799                    r = self.call_RequestAccess(uri, req, 
800                            self.cert_file, self.cert_pwd, self.trusted_certs)
801            except service_error, e:
802                if e.code == service_error.access:
803                    self.log.debug("[get_access] Access denied")
804                    r = None
805                    continue
806                else:
807                    raise e
808
809            if r.has_key('RequestAccessResponseBody'):
810                # Through to here we have a valid response, not a fault.
811                # Access denied is a fault, so something better or worse than
812                # access denied has happened.
813                r = r['RequestAccessResponseBody']
814                self.log.debug("[get_access] Access granted")
815                break
816            else:
817                raise service_error(service_error.protocol,
818                        "Bad proxy response")
819       
820        if not r:
821            raise service_error(service_error.access, 
822                    "Access denied by %s (%s)" % (tb, uri))
823
824        tbparam[tb] = { 
825                "allocID" : r['allocID'],
826                "uri": uri,
827                }
828        if 'service' in r: 
829            services.extend(r['service'])
830
831        # Add attributes to parameter space.  We don't allow attributes to
832        # overlay any parameters already installed.
833        for a in r['fedAttr']:
834            try:
835                if a['attribute'] and \
836                        isinstance(a['attribute'], basestring)\
837                        and not tbparam[tb].has_key(a['attribute'].lower()):
838                    tbparam[tb][a['attribute'].lower()] = a['value']
839            except KeyError:
840                self.log.error("Bad attribute in response: %s" % a)
841
842    def release_access(self, tb, aid, uri=None):
843        """
844        Release access to testbed through fedd
845        """
846
847        if not uri:
848            uri = self.tbmap.get(tb, None)
849        if not uri:
850            raise service_error(service_error.server_config, 
851                    "Unknown testbed: %s" % tb)
852
853        if self.local_access.has_key(uri):
854            resp = self.local_access[uri].ReleaseAccess(\
855                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
856                    fedid(file=self.cert_file))
857            resp = { 'ReleaseAccessResponseBody': resp } 
858        else:
859            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
860                    self.cert_file, self.cert_pwd, self.trusted_certs)
861
862        # better error coding
863
864    def remote_splitter(self, uri, desc, master):
865
866        req = {
867                'description' : { 'ns2description': desc },
868                'master': master,
869                'include_fedkit': bool(self.fedkit),
870                'include_gatewaykit': bool(self.gatewaykit)
871            }
872
873        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
874                self.trusted_certs)
875
876        if r.has_key('Ns2SplitResponseBody'):
877            r = r['Ns2SplitResponseBody']
878            if r.has_key('output'):
879                return r['output'].splitlines()
880            else:
881                raise service_error(service_error.protocol, 
882                        "Bad splitter response (no output)")
883        else:
884            raise service_error(service_error.protocol, "Bad splitter response")
885
886    class start_segment:
887        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
888                cert_pwd=None, trusted_certs=None, caller=None,
889                log_collector=None):
890            self.log = log
891            self.debug = debug
892            self.cert_file = cert_file
893            self.cert_pwd = cert_pwd
894            self.trusted_certs = None
895            self.caller = caller
896            self.testbed = testbed
897            self.log_collector = log_collector
898            self.response = None
899
900        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
901                services=None):
902            req = {
903                    'allocID': { 'fedid' : aid }, 
904                    'segmentdescription': { 
905                        'topdldescription': topo.to_dict(),
906                    },
907                    'master': master,
908                }
909
910            if connInfo:
911                req['connection'] = connInfo
912            # Add services to request.  The master exports, everyone else
913            # imports.
914            if services:
915                svcs = [ x.copy() for x in services]
916                for s in svcs:
917                    if master: s['visibility'] = 'export'
918                    else: s['visibility'] = 'import'
919                req['service'] = svcs
920            if attrs:
921                req['fedAttr'] = attrs
922
923            try:
924                self.log.debug("Calling StartSegment at %s " % uri)
925                print req
926                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
927                        self.trusted_certs)
928                if r.has_key('StartSegmentResponseBody'):
929                    lval = r['StartSegmentResponseBody'].get('allocationLog',
930                            None)
931                    if lval and self.log_collector:
932                        for line in  lval.splitlines(True):
933                            self.log_collector.write(line)
934                    self.response = r
935                else:
936                    raise service_error(service_error.internal, 
937                            "Bad response!?: %s" %r)
938                return True
939            except service_error, e:
940                self.log.error("Start segment failed on %s: %s" % \
941                        (self.testbed, e))
942                return False
943
944
945
946    class terminate_segment:
947        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
948                cert_pwd=None, trusted_certs=None, caller=None):
949            self.log = log
950            self.debug = debug
951            self.cert_file = cert_file
952            self.cert_pwd = cert_pwd
953            self.trusted_certs = None
954            self.caller = caller
955            self.testbed = testbed
956
957        def __call__(self, uri, aid ):
958            req = {
959                    'allocID': aid , 
960                }
961            try:
962                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
963                        self.trusted_certs)
964                return True
965            except service_error, e:
966                self.log.error("Terminate segment failed on %s: %s" % \
967                        (self.testbed, e))
968                return False
969   
970
971    def allocate_resources(self, allocated, master, eid, expid, 
972            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
973            attrs=None, connInfo={}, services=[]):
974        def get_vlan(r):
975            if r.has_key('StartSegmentResponseBody'):
976                srb = r['StartSegmentResponseBody']
977                if srb.has_key('fedAttr'):
978                    for k, v in [ (a['attribute'], a['value']) \
979                            for a in srb['fedAttr']]:
980                        if k == 'vlan': return v
981            return None
982
983        started = { }           # Testbeds where a sub-experiment started
984                                # successfully
985
986        # XXX
987        fail_soft = False
988
989        non_transit = [ k for k in allocated.keys() \
990                if not topo[k].get_attribute('transit')]
991        transit = [ k for k in allocated.keys() \
992                if topo[k].get_attribute('transit')]
993
994        log = alloc_log or self.log
995
996        thread_pool = self.thread_pool(self.nthreads)
997        threads = [ ]
998
999        for tb in transit:
1000            uri = tbparams[tb]['uri']
1001            if tbparams[tb].has_key('allocID') and \
1002                    tbparams[tb]['allocID'].has_key('fedid'):
1003                aid = tbparams[tb]['allocID']['fedid']
1004            else:
1005                raise service_error(service_error.internal, 
1006                        "No alloc id for testbed %s !?" % tb)
1007
1008            m = re.search('(\d+)', tb)
1009            if m:
1010                to_repl = "unassigned%s" % m.group(1)
1011            else:
1012                raise service_error(service_error.internal, 
1013                        "Bad dynamic allocation name")
1014                break
1015
1016            ss = self.start_segment(log=log, debug=self.debug, 
1017                testbed=tb, cert_file=self.cert_file, 
1018                cert_pwd=self.cert_pwd, 
1019                trusted_certs=self.trusted_certs,
1020                caller=self.call_StartSegment,
1021                log_collector=log_collector)
1022            t = self.pooled_thread(
1023                    target=ss,
1024                    args =(uri, aid, topo[tb], False, attrs, connInfo[tb],
1025                        services),
1026                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1027            threads.append(t)
1028            t.start()
1029            # Wait until the this transit node finishes (keep pinging the log,
1030            # though)
1031
1032            mins = 0
1033            while not thread_pool.wait_for_all_done(60.0):
1034                mins += 1
1035                alloc_log.info("Waiting for transit (it has been %d mins)" \
1036                        % mins)
1037
1038            if t.rv:
1039                vlan = get_vlan(ss.response)
1040                if vlan is not None:
1041                    for k, t in topo.items():
1042                        for e in t.elements:
1043                            for i in e.interface:
1044                                vl = i.get_attribute('dragon_vlan')
1045                                if vl is not None and vl == to_repl:
1046                                    i.set_attribute('dragon_vlan', vlan)
1047            else:
1048                break
1049            thread_pool.clear()
1050
1051
1052        failed = [ t.getName() for t in threads if not t.rv ]
1053
1054        if len(failed) == 0:
1055            for tb in non_transit:
1056                # Create and start a thread to start the segment, and save it
1057                # to get the return value later
1058                thread_pool.wait_for_slot()
1059                uri = self.tbmap.get(tb, None)
1060                if not uri:
1061                    raise service_error(service_error.internal, 
1062                            "Unknown testbed %s !?" % tb)
1063
1064                if tbparams[tb].has_key('allocID') and \
1065                        tbparams[tb]['allocID'].has_key('fedid'):
1066                    aid = tbparams[tb]['allocID']['fedid']
1067                else:
1068                    raise service_error(service_error.internal, 
1069                            "No alloc id for testbed %s !?" % tb)
1070
1071                t  = self.pooled_thread(\
1072                        target=self.start_segment(log=log, debug=self.debug,
1073                            testbed=tb, cert_file=self.cert_file,
1074                            cert_pwd=self.cert_pwd,
1075                            trusted_certs=self.trusted_certs,
1076                            caller=self.call_StartSegment,
1077                            log_collector=log_collector), 
1078                        args=(uri, aid, topo[tb], tb == master, 
1079                            attrs, connInfo[tb], services),
1080                        name=tb,
1081                        pdata=thread_pool, trace_file=self.trace_file)
1082                threads.append(t)
1083                t.start()
1084
1085            # Wait until all finish (keep pinging the log, though)
1086            mins = 0
1087            while not thread_pool.wait_for_all_done(60.0):
1088                mins += 1
1089                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1090                        % mins)
1091
1092            thread_pool.clear()
1093
1094        failed = [ t.getName() for t in threads if not t.rv ]
1095        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1096
1097        # If one failed clean up, unless fail_soft is set
1098        if failed:
1099            if not fail_soft:
1100                thread_pool.clear()
1101                for tb in succeeded:
1102                    # Create and start a thread to stop the segment
1103                    thread_pool.wait_for_slot()
1104                    uri = tbparams[tb]['uri']
1105                    t  = self.pooled_thread(\
1106                            target=self.terminate_segment(log=log,
1107                                testbed=tb,
1108                                cert_file=self.cert_file, 
1109                                cert_pwd=self.cert_pwd,
1110                                trusted_certs=self.trusted_certs,
1111                                caller=self.call_TerminateSegment),
1112                            args=(uri, tbparams[tb]['federant']['allocID']),
1113                            name=tb,
1114                            pdata=thread_pool, trace_file=self.trace_file)
1115                    t.start()
1116                # Wait until all finish
1117                thread_pool.wait_for_all_done()
1118
1119                # release the allocations
1120                for tb in tbparams.keys():
1121                    self.release_access(tb, tbparams[tb]['allocID'],
1122                            tbparams[tb].get('uri', None))
1123                # Remove the placeholder
1124                self.state_lock.acquire()
1125                self.state[eid]['experimentStatus'] = 'failed'
1126                if self.state_filename: self.write_state()
1127                self.state_lock.release()
1128
1129                log.error("Swap in failed on %s" % ",".join(failed))
1130                return
1131        else:
1132            log.info("[start_segment]: Experiment %s active" % eid)
1133
1134
1135        # Walk up tmpdir, deleting as we go
1136        if self.cleanup:
1137            log.debug("[start_experiment]: removing %s" % tmpdir)
1138            for path, dirs, files in os.walk(tmpdir, topdown=False):
1139                for f in files:
1140                    os.remove(os.path.join(path, f))
1141                for d in dirs:
1142                    os.rmdir(os.path.join(path, d))
1143            os.rmdir(tmpdir)
1144        else:
1145            log.debug("[start_experiment]: not removing %s" % tmpdir)
1146
1147        # Insert the experiment into our state and update the disk copy
1148        self.state_lock.acquire()
1149        self.state[expid]['experimentStatus'] = 'active'
1150        self.state[eid] = self.state[expid]
1151        if self.state_filename: self.write_state()
1152        self.state_lock.release()
1153        return
1154
1155
1156    def add_kit(self, e, kit):
1157        """
1158        Add a Software object created from the list of (install, location)
1159        tuples passed as kit  to the software attribute of an object e.  We
1160        do this enough to break out the code, but it's kind of a hack to
1161        avoid changing the old tuple rep.
1162        """
1163
1164        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1165
1166        if isinstance(e.software, list): e.software.extend(s)
1167        else: e.software = s
1168
1169
1170    def create_experiment_state(self, fid, req, expid, expcert, 
1171            state='starting'):
1172        """
1173        Create the initial entry in the experiment's state.  The expid and
1174        expcert are the experiment's fedid and certifacte that represents that
1175        ID, which are installed in the experiment state.  If the request
1176        includes a suggested local name that is used if possible.  If the local
1177        name is already taken by an experiment owned by this user that has
1178        failed, it is overwritten.  Otherwise new letters are added until a
1179        valid localname is found.  The generated local name is returned.
1180        """
1181
1182        if req.has_key('experimentID') and \
1183                req['experimentID'].has_key('localname'):
1184            overwrite = False
1185            eid = req['experimentID']['localname']
1186            # If there's an old failed experiment here with the same local name
1187            # and accessible by this user, we'll overwrite it, otherwise we'll
1188            # fall through and do the collision avoidance.
1189            old_expid = self.get_experiment_fedid(eid)
1190            if old_expid and self.check_experiment_access(fid, old_expid):
1191                self.state_lock.acquire()
1192                status = self.state[eid].get('experimentStatus', None)
1193                if status and status == 'failed':
1194                    # remove the old access attribute
1195                    self.auth.unset_attribute(fid, old_expid)
1196                    overwrite = True
1197                    del self.state[eid]
1198                    del self.state[old_expid]
1199                self.state_lock.release()
1200            self.state_lock.acquire()
1201            while (self.state.has_key(eid) and not overwrite):
1202                eid += random.choice(string.ascii_letters)
1203            # Initial state
1204            self.state[eid] = {
1205                    'experimentID' : \
1206                            [ { 'localname' : eid }, {'fedid': expid } ],
1207                    'experimentStatus': state,
1208                    'experimentAccess': { 'X509' : expcert },
1209                    'owner': fid,
1210                    'log' : [],
1211                }
1212            self.state[expid] = self.state[eid]
1213            if self.state_filename: self.write_state()
1214            self.state_lock.release()
1215        else:
1216            eid = self.exp_stem
1217            for i in range(0,5):
1218                eid += random.choice(string.ascii_letters)
1219            self.state_lock.acquire()
1220            while (self.state.has_key(eid)):
1221                eid = self.exp_stem
1222                for i in range(0,5):
1223                    eid += random.choice(string.ascii_letters)
1224            # Initial state
1225            self.state[eid] = {
1226                    'experimentID' : \
1227                            [ { 'localname' : eid }, {'fedid': expid } ],
1228                    'experimentStatus': state,
1229                    'experimentAccess': { 'X509' : expcert },
1230                    'owner': fid,
1231                    'log' : [],
1232                }
1233            self.state[expid] = self.state[eid]
1234            if self.state_filename: self.write_state()
1235            self.state_lock.release()
1236
1237        return eid
1238
1239
1240    def allocate_ips_to_topo(self, top):
1241        """
1242        Add an ip4_address attribute to all the hosts in the topology, based on
1243        the shared substrates on which they sit.  An /etc/hosts file is also
1244        created and returned as a list of hostfiles entries.  We also return
1245        the allocator, because we may need to allocate IPs to portals
1246        (specifically DRAGON portals).
1247        """
1248        subs = sorted(top.substrates, 
1249                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1250                reverse=True)
1251        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1252        ifs = { }
1253        hosts = [ ]
1254
1255        for idx, s in enumerate(subs):
1256            a = ips.allocate(len(s.interfaces)+2)
1257            if a :
1258                base, num = a
1259                if num < len(s.interfaces) +2 : 
1260                    raise service_error(service_error.internal,
1261                            "Allocator returned wrong number of IPs??")
1262            else:
1263                raise service_error(service_error.req, 
1264                        "Cannot allocate IP addresses")
1265
1266            base += 1
1267            for i in s.interfaces:
1268                i.attribute.append(
1269                        topdl.Attribute('ip4_address', 
1270                            "%s" % ip_addr(base)))
1271                hname = i.element.name[0]
1272                if ifs.has_key(hname):
1273                    hosts.append("%s\t%s-%s %s-%d" % \
1274                            (ip_addr(base), hname, s.name, hname,
1275                                ifs[hname]))
1276                else:
1277                    ifs[hname] = 0
1278                    hosts.append("%s\t%s-%s %s-%d %s" % \
1279                            (ip_addr(base), hname, s.name, hname,
1280                                ifs[hname], hname))
1281
1282                ifs[hname] += 1
1283                base += 1
1284        return hosts, ips
1285
1286    def get_access_to_testbeds(self, testbeds, access_user, 
1287            export_project, master, allocated, tbparams, services):
1288        """
1289        Request access to the various testbeds required for this instantiation
1290        (passed in as testbeds).  User, access_user, expoert_project and master
1291        are used to construct the correct requests.  Per-testbed parameters are
1292        returned in tbparams.
1293        """
1294        for tb in testbeds:
1295            self.get_access(tb, None, tbparams, master,
1296                    export_project, access_user, services)
1297            allocated[tb] = 1
1298
1299    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1300        """
1301        Create the sub-topologies that are needed for experiment instantiation.
1302        """
1303        for tb in testbeds:
1304            topo[tb] = top.clone()
1305            to_delete = [ ]
1306            # XXX: copy in for loop to simplify
1307            for e in topo[tb].elements:
1308                etb = e.get_attribute('testbed')
1309                if etb and etb != tb:
1310                    for i in e.interface:
1311                        for s in i.subs:
1312                            try:
1313                                s.interfaces.remove(i)
1314                            except ValueError:
1315                                raise service_error(service_error.internal,
1316                                        "Can't remove interface??")
1317                    to_delete.append(e)
1318            for e in to_delete:
1319                topo[tb].elements.remove(e)
1320            topo[tb].make_indices()
1321
1322            for e in [ e for e in topo[tb].elements \
1323                    if isinstance(e,topdl.Computer)]:
1324                if self.fedkit: self.add_kit(e, self.fedkit)
1325
1326    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1327            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[]):
1328        """
1329        Return a new internet portal node and a dict with the connectionInfo to
1330        be attached.
1331        """
1332        dproject = tbparams[dt].get('project', 'project')
1333        ddomain = tbparams[dt].get('domain', ".example.com")
1334        mdomain = tbparams[master].get('domain', '.example.com')
1335        mproject = tbparams[master].get('project', 'project')
1336        muser = tbparams[master].get('user', 'root')
1337        smbshare = tbparams[master].get('smbshare', 'USERS')
1338
1339        if st == master or dt == master:
1340            active = ("%s" % (st == master))
1341        else:
1342            active = ("%s" % (st > dt))
1343
1344        ifaces = [ ]
1345        for sub, attrs in iface_desc:
1346            inf = topdl.Interface(
1347                    name="inf%03d" % len(ifaces),
1348                    substrate=sub,
1349                    attribute=[
1350                        topdl.Attribute(
1351                            attribute=n,
1352                            value = v)
1353                        for n, v in attrs
1354                        ]
1355                    )
1356            ifaces.append(inf)
1357        info = {
1358                "type" : conn_type, 
1359                "portal": myname,
1360                'peer': desthost,
1361                'fedAttr': [ 
1362                        { 'attribute': 'masterdomain', 'value': mdomain},
1363                        { 'attribute': 'masterexperiment', 'value': 
1364                            "%s/%s" % (mproject, eid)},
1365                        { 'attribute': 'active', 'value': active},
1366                        # Move to SMB service description
1367                        { 'attribute': 'masteruser', 'value': muser},
1368                        { 'attribute': 'smbshare', 'value': smbshare},
1369                    ],
1370                }
1371
1372        if conn_type == 'transit':
1373            info['member'] = [ { 'element': myname, 'interface': 'inf000'} ]
1374        if conn_attrs:
1375            info['fedAttr'].extend(con_attrs)
1376
1377        return (topdl.Computer(
1378                name=myname,
1379                attribute=[ 
1380                    topdl.Attribute(attribute=n,value=v)
1381                        for n, v in (\
1382                            ('portal', 'true'),
1383                            ('portal_type', portal_type), 
1384                        )
1385                    ],
1386                interface=ifaces,
1387                ), info)
1388
1389    def new_portal_substrate(self, st, dt, eid, tbparams):
1390        ddomain = tbparams[dt].get('domain', ".example.com")
1391        dproject = tbparams[dt].get('project', 'project')
1392        tsubstrate = \
1393                topdl.Substrate(name='%s-%s' % (st, dt),
1394                        attribute= [
1395                            topdl.Attribute(
1396                                attribute='portal',
1397                                value='true')
1398                            ]
1399                        )
1400        segment_element = topdl.Segment(
1401                id= tbparams[dt]['allocID'],
1402                type='emulab',
1403                uri = self.tbmap.get(dt, None),
1404                interface=[ 
1405                    topdl.Interface(
1406                        substrate=tsubstrate.name),
1407                    ],
1408                attribute = [
1409                    topdl.Attribute(attribute=n, value=v)
1410                        for n, v in (\
1411                            ('domain', ddomain),
1412                            ('experiment', "%s/%s" % \
1413                                    (dproject, eid)),)
1414                    ],
1415                )
1416
1417        return (tsubstrate, segment_element)
1418
1419    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
1420        if sub.capacity is None:
1421            raise service_error(service_error.internal,
1422                    "Cannot DRAGON split substrate w/o capacity")
1423        segs = [ ]
1424        substr = topdl.Substrate(name="dragon%d" % idx, 
1425                capacity=sub.capacity.clone(),
1426                attribute=[ topdl.Attribute(attribute=n, value=v)
1427                    for n, v, in (\
1428                            ('vlan', 'unassigned%d' % idx),)])
1429        for tb in tbs.keys():
1430            seg = topdl.Segment(
1431                    id = tbparams[tb]['allocID'],
1432                    type='emulab',
1433                    uri = self.tbmap.get(tb, None),
1434                    interface=[ 
1435                        topdl.Interface(
1436                            substrate=substr.name),
1437                        ],
1438                    attribute=[ topdl.Attribute(
1439                        attribute='dragon_endpoint', 
1440                        value=tbparams[tb]['dragon']),
1441                        ]
1442                    )
1443            if tbparams[tb].has_key('vlans'):
1444                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1445            segs.append(seg)
1446
1447        topo["dragon%d" %idx] = \
1448                topdl.Topology(substrates=[substr], elements=segs,
1449                        attribute=[
1450                            topdl.Attribute(attribute="transit", value='true'),
1451                            topdl.Attribute(attribute="dynamic", value='true'),
1452                            topdl.Attribute(attribute="testbed", value='dragon'),
1453                            ]
1454                        )
1455
1456    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
1457            connInfo):
1458        """
1459        Add attribiutes to the various elements indicating that they are to be
1460        dragon connected and create a dragon segment in topo to be
1461        instantiated.
1462        """
1463
1464        def get_substrate_from_topo(name, t):
1465            for s in t.substrates:
1466                if s.name == name: return s
1467            else: return None
1468
1469        # dn is the number of previously created dragon nets.  This routine
1470        # creates a net numbered by dn
1471        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1472        # Count the number of interfaces on this substrate in each testbed from
1473        # the global topology
1474        count = { }
1475        for e in [ i.element for i in sub.interfaces ]:
1476            tb = e.get_attribute('testbed')
1477            count[tb] = count.get(tb, 0) + 1
1478
1479        # Set the attributes in the copies that will allow setup of dragon
1480        # connections.
1481        for tb in tbs.keys():
1482            s = get_substrate_from_topo(sub.name, topo[tb])
1483            if s:
1484                if not connInfo.has_key(tb):
1485                    connInfo[tb] = [ ]
1486                info = {
1487                        'type': 'transit',
1488                        'member': [ {
1489                            'element': i.element.name[0], 
1490                            'interface': i.name
1491                            } for i in s.interfaces ],
1492                        'fedAttr': [ { 
1493                            'attribute': 'vlan_id',
1494                            'value': 'unassigned%d' % dn
1495                            } ]
1496                        }
1497                connInfo[tb].append(info)
1498            else:
1499                raise service_error(service_error.internal,
1500                        "No substrate %s in testbed %s" % (sub.name, tb))
1501
1502        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1503
1504    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1505            segment_substrate, portals, connInfo):
1506        # More than one testbed is on this substrate.  Insert
1507        # some portals into the subtopologies.  st == source testbed,
1508        # dt == destination testbed.
1509        for st in tbs.keys():
1510            if not segment_substrate.has_key(st):
1511                segment_substrate[st] = { }
1512            if not portals.has_key(st): 
1513                portals[st] = { }
1514            if not connInfo.has_key(st):
1515                connInfo[st] = [ ]
1516            for dt in [ t for t in tbs.keys() if t != st]:
1517                sproject = tbparams[st].get('project', 'project')
1518                dproject = tbparams[dt].get('project', 'project')
1519                mproject = tbparams[master].get('project', 'project')
1520                sdomain = tbparams[st].get('domain', ".example.com")
1521                ddomain = tbparams[dt].get('domain', ".example.com")
1522                mdomain = tbparams[master].get('domain', '.example.com')
1523                muser = tbparams[master].get('user', 'root')
1524                smbshare = tbparams[master].get('smbshare', 'USERS')
1525                aid = tbparams[dt]['allocID']['fedid']
1526                if st == master or dt == master:
1527                    active = ("%s" % (st == master))
1528                else:
1529                    active = ("%s" %(st > dt))
1530                if not segment_substrate[st].has_key(dt):
1531                    # Put a substrate and a segment for the connected
1532                    # testbed in there.
1533                    tsubstrate, segment_element = \
1534                            self.new_portal_substrate(st, dt, eid, tbparams)
1535                    segment_substrate[st][dt] = tsubstrate
1536                    topo[st].substrates.append(tsubstrate)
1537                    topo[st].elements.append(segment_element)
1538
1539                new_portal = False
1540                if portals[st].has_key(dt):
1541                    # There's a portal set up to go to this destination.
1542                    # See if there's room to multiplex this connection on
1543                    # it.  If so, add an interface to the portal; if not,
1544                    # set up to add a portal below.
1545                    # [This little festival of braces is just a pop of the
1546                    # last element in the list of portals between st and
1547                    # dt.]
1548                    portal = portals[st][dt][-1]
1549                    mux = len([ i for i in portal.interface \
1550                            if not i.get_attribute('portal')])
1551                    if mux == self.muxmax:
1552                        new_portal = True
1553                        portal_type = "experiment"
1554                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1555                        desthost = "%stunnel%d.%s.%s%s" % (st, 
1556                                len(portals[st][dt]), eid.lower(),
1557                                dproject.lower(), ddomain.lower())
1558                    else:
1559                        new_i = topdl.Interface(
1560                                substrate=sub.name,
1561                                attribute=[ 
1562                                    topdl.Attribute(
1563                                        attribute='ip4_address', 
1564                                        value=tbs[dt]
1565                                    )
1566                                ])
1567                        portal.interface.append(new_i)
1568                else:
1569                    # First connection to this testbed, make an empty list
1570                    # and set up to add the new portal below
1571                    new_portal = True
1572                    portals[st][dt] = [ ]
1573                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1574                    desthost = "%stunnel%d.%s.%s%s" % (st.lower(), 
1575                            len(portals[st][dt]), eid.lower(), 
1576                            dproject.lower(), ddomain.lower())
1577
1578                    if dt == master or st == master: portal_type = "both"
1579                    else: portal_type = "experiment"
1580
1581                if new_portal:
1582                    infs = (
1583                            (segment_substrate[st][dt].name, 
1584                                (('portal', 'true'),)),
1585                            (sub.name, 
1586                                (('ip4_address', tbs[dt]),))
1587                        )
1588                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
1589                            master, eid, myname, desthost, portal_type,
1590                            infs)
1591                    if self.fedkit:
1592                        self.add_kit(portal, self.fedkit)
1593                    if self.gatewaykit: 
1594                        self.add_kit(portal, self.gatewaykit)
1595
1596                    topo[st].elements.append(portal)
1597                    portals[st][dt].append(portal)
1598                    connInfo[st].append(info)
1599
1600    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo):
1601        # Add to the master testbed
1602        tsubstrate, segment_element = \
1603                self.new_portal_substrate(st, dt, eid, tbparams)
1604        myname = "%stunnel" % dt
1605        desthost = "%stunnel" % st
1606
1607        portal, info = self.new_portal_node(st, dt, tbparams, master,
1608                eid, myname, desthost, "control", 
1609                ((tsubstrate.name,(('portal','true'),)),))
1610        if self.fedkit:
1611            self.add_kit(portal, self.fedkit)
1612        if self.gatewaykit: 
1613            self.add_kit(portal, self.gatewaykit)
1614
1615        topo[st].substrates.append(tsubstrate)
1616        topo[st].elements.append(segment_element)
1617        topo[st].elements.append(portal)
1618        if not connInfo.has_key(st):
1619            connInfo[st] = [ ]
1620        connInfo[st].append(info)
1621
1622    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1623            substrate, tbparams):
1624        # Add to the master testbed
1625        myname = "%stunnel" % dt
1626        desthost = "%s" % ip_addr(dip)
1627
1628        portal, info = self.new_portal_node(st, dt, tbparams, master,
1629                eid, myname, desthost, "control", 
1630                ((substrate.name,(
1631                    ('portal','true'),
1632                    ('ip4_address', "%s" % ip_addr(myip)),)),),
1633                conn_type="transit",
1634                conn_attrs = [ {
1635                    'attribute': 'vlan_id', 'value': 'unassigned%d' % idx
1636                    }])
1637        if self.fedkit:
1638            self.add_kit(portal, self.fedkit)
1639        if self.gatewaykit: 
1640            self.add_kit(portal, self.gatewaykit)
1641
1642        return portal, info
1643
1644    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
1645            connInfo):
1646        """
1647        For each substrate in the main topology, find those that
1648        have nodes on more than one testbed.  Insert portal nodes
1649        into the copies of those substrates on the sub topologies.
1650        """
1651        segment_substrate = { }
1652        portals = { }
1653        for s in top.substrates:
1654            # tbs will contain an ip address on this subsrate that is in
1655            # each testbed.
1656            tbs = { }
1657            for i in s.interfaces:
1658                e = i.element
1659                tb = e.get_attribute('testbed')
1660                if tb and not tbs.has_key(tb):
1661                    for i in e.interface:
1662                        if s in i.subs:
1663                            tbs[tb]= i.get_attribute('ip4_address')
1664            if len(tbs) < 2:
1665                continue
1666
1667            # DRAGON will not create multi-site vlans yet
1668            if len(tbs) == 2 and \
1669                    all([tbparams[x].has_key('dragon') for x in tbs]):
1670                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1671                        master, eid, connInfo)
1672            else:
1673                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1674                        eid, segment_substrate, portals, connInfo)
1675
1676        # Make sure that all the slaves have a control portal back to the
1677        # master.
1678        for tb in [ t for t in tbparams.keys() if t != master ]:
1679            if len([e for e in topo[tb].elements \
1680                    if isinstance(e, topdl.Computer) and \
1681                    e.get_attribute('portal') and \
1682                    e.get_attribute('portal_type') == 'both']) == 0:
1683
1684                if tbparams[master].has_key('dragon') \
1685                        and tbparams[tb].has_key('dragon'):
1686
1687                    idx = len([x for x in topo.keys() \
1688                            if x.startswith('dragon')])
1689                    dip, leng = ip_allocator.allocate(4)
1690                    dip += 1
1691                    mip = dip+1
1692                    csub = topdl.Substrate(
1693                            name="dragon-control-%s" % tb,
1694                            capacity=topdl.Capacity(100000.0, 'max'),
1695                            attribute=[
1696                                topdl.Attribute(
1697                                    attribute='portal',
1698                                    value='true'
1699                                    )
1700                                ]
1701                            )
1702                    seg = topdl.Segment(
1703                            id= tbparams[master]['allocID'],
1704                            type='emulab',
1705                            uri = self.tbmap.get(master, None),
1706                            interface=[ 
1707                                topdl.Interface(
1708                                    substrate=csub.name),
1709                                ],
1710                            attribute = [
1711                                topdl.Attribute(attribute=n, value=v)
1712                                    for n, v in (\
1713                                        ('domain', 
1714                                            tbparams[master].get('domain',
1715                                                ".example.com")),
1716                                        ('experiment', "%s/%s" % \
1717                                                (tbparams[master].get(
1718                                                    'project', 
1719                                                    'project'), 
1720                                                    eid)),)
1721                                ],
1722                            )
1723                    portal, info = self.new_dragon_portal(tb, master,
1724                            master, eid, dip, mip, idx, csub, tbparams)
1725                    topo[tb].substrates.append(csub)
1726                    topo[tb].elements.append(portal)
1727                    topo[tb].elements.append(seg)
1728                    if not connInfo.has_key(tb):
1729                        connInfo[tb] = [info]
1730                    else:
1731                        connInfo[tb].append(info)
1732
1733                    mcsub = csub.clone()
1734                    seg = topdl.Segment(
1735                            id= tbparams[tb]['allocID'],
1736                            type='emulab',
1737                            uri = self.tbmap.get(tb, None),
1738                            interface=[ 
1739                                topdl.Interface(
1740                                    substrate=csub.name),
1741                                ],
1742                            attribute = [
1743                                topdl.Attribute(attribute=n, value=v)
1744                                    for n, v in (\
1745                                        ('domain', 
1746                                            tbparams[tb].get('domain',
1747                                                ".example.com")),
1748                                        ('experiment', "%s/%s" % \
1749                                                (tbparams[tb].get('project', 
1750                                                    'project'), 
1751                                                    eid)),)
1752                                ],
1753                            )
1754                    portal, info = self.new_dragon_portal(master, tb, master,
1755                            eid, mip, dip, idx, mcsub, tbparams)
1756                    topo[master].substrates.append(mcsub)
1757                    topo[master].elements.append(portal)
1758                    topo[master].elements.append(seg)
1759                    if not connInfo.has_key(master):
1760                        connInfo[master] = [info]
1761                    else:
1762                        connInfo[master].append(info)
1763
1764                    self.create_dragon_substrate(csub, topo, 
1765                            {tb: 1, master:1}, tbparams, master, eid)
1766                else:
1767                    self.add_control_portal(master, tb, master, eid, topo, 
1768                            tbparams, connInfo)
1769                    self.add_control_portal(tb, master, master, eid, topo, 
1770                            tbparams, connInfo)
1771
1772        # Connect the portal nodes into the topologies and clear out
1773        # substrates that are not in the topologies
1774        for tb in tbparams.keys():
1775            topo[tb].incorporate_elements()
1776            topo[tb].substrates = \
1777                    [s for s in topo[tb].substrates \
1778                        if len(s.interfaces) >0]
1779
1780    def wrangle_software(self, expid, top, topo, tbparams):
1781        """
1782        Copy software out to the repository directory, allocate permissions and
1783        rewrite the segment topologies to look for the software in local
1784        places.
1785        """
1786
1787        # Copy the rpms and tarfiles to a distribution directory from
1788        # which the federants can retrieve them
1789        linkpath = "%s/software" %  expid
1790        softdir ="%s/%s" % ( self.repodir, linkpath)
1791        softmap = { }
1792        # These are in a list of tuples format (each kit).  This comprehension
1793        # unwraps them into a single list of tuples that initilaizes the set of
1794        # tuples.
1795        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1796                for p, t in l ])
1797        pkgs.update([x.location for e in top.elements \
1798                for x in e.software])
1799        try:
1800            os.makedirs(softdir)
1801        except IOError, e:
1802            raise service_error(
1803                    "Cannot create software directory: %s" % e)
1804        # The actual copying.  Everything's converted into a url for copying.
1805        for pkg in pkgs:
1806            loc = pkg
1807
1808            scheme, host, path = urlparse(loc)[0:3]
1809            dest = os.path.basename(path)
1810            if not scheme:
1811                if not loc.startswith('/'):
1812                    loc = "/%s" % loc
1813                loc = "file://%s" %loc
1814            try:
1815                u = urlopen(loc)
1816            except Exception, e:
1817                raise service_error(service_error.req, 
1818                        "Cannot open %s: %s" % (loc, e))
1819            try:
1820                f = open("%s/%s" % (softdir, dest) , "w")
1821                self.log.debug("Writing %s/%s" % (softdir,dest) )
1822                data = u.read(4096)
1823                while data:
1824                    f.write(data)
1825                    data = u.read(4096)
1826                f.close()
1827                u.close()
1828            except Exception, e:
1829                raise service_error(service_error.internal,
1830                        "Could not copy %s: %s" % (loc, e))
1831            path = re.sub("/tmp", "", linkpath)
1832            # XXX
1833            softmap[pkg] = \
1834                    "%s/%s/%s" %\
1835                    ( self.repo_url, path, dest)
1836
1837            # Allow the individual segments to access the software.
1838            for tb in tbparams.keys():
1839                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1840                        "/%s/%s" % ( path, dest))
1841
1842        # Convert the software locations in the segments into the local
1843        # copies on this host
1844        for soft in [ s for tb in topo.values() \
1845                for e in tb.elements \
1846                    if getattr(e, 'software', False) \
1847                        for s in e.software ]:
1848            if softmap.has_key(soft.location):
1849                soft.location = softmap[soft.location]
1850
1851
1852    def new_experiment(self, req, fid):
1853        """
1854        The external interface to empty initial experiment creation called from
1855        the dispatcher.
1856
1857        Creates a working directory, splits the incoming description using the
1858        splitter script and parses out the avrious subsections using the
1859        lcasses above.  Once each sub-experiment is created, use pooled threads
1860        to instantiate them and start it all up.
1861        """
1862        if not self.auth.check_attribute(fid, 'new'):
1863            raise service_error(service_error.access, "New access denied")
1864
1865        try:
1866            tmpdir = tempfile.mkdtemp(prefix="split-")
1867        except IOError:
1868            raise service_error(service_error.internal, "Cannot create tmp dir")
1869
1870        try:
1871            access_user = self.accessdb[fid]
1872        except KeyError:
1873            raise service_error(service_error.internal,
1874                    "Access map and authorizer out of sync in " + \
1875                            "new_experiment for fedid %s"  % fid)
1876
1877        pid = "dummy"
1878        gid = "dummy"
1879
1880        req = req.get('NewRequestBody', None)
1881        if not req:
1882            raise service_error(service_error.req,
1883                    "Bad request format (no NewRequestBody)")
1884
1885        # Generate an ID for the experiment (slice) and a certificate that the
1886        # allocator can use to prove they own it.  We'll ship it back through
1887        # the encrypted connection.
1888        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1889
1890        #now we're done with the tmpdir, and it should be empty
1891        if self.cleanup:
1892            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1893            os.rmdir(tmpdir)
1894        else:
1895            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1896
1897        eid = self.create_experiment_state(fid, req, expid, expcert, 
1898                state='empty')
1899
1900        # Let users touch the state
1901        self.auth.set_attribute(fid, expid)
1902        self.auth.set_attribute(expid, expid)
1903        # Override fedids can manipulate state as well
1904        for o in self.overrides:
1905            self.auth.set_attribute(o, expid)
1906
1907        rv = {
1908                'experimentID': [
1909                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1910                ],
1911                'experimentStatus': 'empty',
1912                'experimentAccess': { 'X509' : expcert }
1913            }
1914
1915        return rv
1916
1917
1918    def create_experiment(self, req, fid):
1919        """
1920        The external interface to experiment creation called from the
1921        dispatcher.
1922
1923        Creates a working directory, splits the incoming description using the
1924        splitter script and parses out the avrious subsections using the
1925        lcasses above.  Once each sub-experiment is created, use pooled threads
1926        to instantiate them and start it all up.
1927        """
1928
1929        req = req.get('CreateRequestBody', None)
1930        if not req:
1931            raise service_error(service_error.req,
1932                    "Bad request format (no CreateRequestBody)")
1933
1934        # Get the experiment access
1935        exp = req.get('experimentID', None)
1936        if exp:
1937            if exp.has_key('fedid'):
1938                key = exp['fedid']
1939                expid = key
1940                eid = None
1941            elif exp.has_key('localname'):
1942                key = exp['localname']
1943                eid = key
1944                expid = None
1945            else:
1946                raise service_error(service_error.req, "Unknown lookup type")
1947        else:
1948            raise service_error(service_error.req, "No request?")
1949
1950        self.check_experiment_access(fid, key)
1951
1952        try:
1953            tmpdir = tempfile.mkdtemp(prefix="split-")
1954            os.mkdir(tmpdir+"/keys")
1955        except IOError:
1956            raise service_error(service_error.internal, "Cannot create tmp dir")
1957
1958        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1959        gw_secretkey_base = "fed.%s" % self.ssh_type
1960        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1961        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1962        tclfile = tmpdir + "/experiment.tcl"
1963        tbparams = { }
1964        try:
1965            access_user = self.accessdb[fid]
1966        except KeyError:
1967            raise service_error(service_error.internal,
1968                    "Access map and authorizer out of sync in " + \
1969                            "create_experiment for fedid %s"  % fid)
1970
1971        pid = "dummy"
1972        gid = "dummy"
1973
1974        # The tcl parser needs to read a file so put the content into that file
1975        descr=req.get('experimentdescription', None)
1976        if descr:
1977            file_content=descr.get('ns2description', None)
1978            if file_content:
1979                try:
1980                    f = open(tclfile, 'w')
1981                    f.write(file_content)
1982                    f.close()
1983                except IOError:
1984                    raise service_error(service_error.internal,
1985                            "Cannot write temp experiment description")
1986            else:
1987                raise service_error(service_error.req, 
1988                        "Only ns2descriptions supported")
1989        else:
1990            raise service_error(service_error.req, "No experiment description")
1991
1992        self.state_lock.acquire()
1993        if self.state.has_key(key):
1994            self.state[key]['experimentStatus'] = "starting"
1995            for e in self.state[key].get('experimentID',[]):
1996                if not expid and e.has_key('fedid'):
1997                    expid = e['fedid']
1998                elif not eid and e.has_key('localname'):
1999                    eid = e['localname']
2000        self.state_lock.release()
2001
2002        if not (eid and expid):
2003            raise service_error(service_error.internal, 
2004                    "Cannot find local experiment info!?")
2005
2006        try: 
2007            # This catches exceptions to clear the placeholder if necessary
2008            try:
2009                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2010            except ValueError:
2011                raise service_error(service_error.server_config, 
2012                        "Bad key type (%s)" % self.ssh_type)
2013
2014            master = req.get('master', None)
2015            if not master:
2016                raise service_error(service_error.req,
2017                        "No master testbed label")
2018            export_project = req.get('exportProject', None)
2019            if not export_project:
2020                raise service_error(service_error.req, "No export project")
2021           
2022            # Translate to topdl
2023            if self.splitter_url:
2024                # XXX: need remote topdl translator
2025                self.log.debug("Calling remote splitter at %s" % \
2026                        self.splitter_url)
2027                split_data = self.remote_splitter(self.splitter_url,
2028                        file_content, master)
2029            else:
2030                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2031                    str(self.muxmax), '-m', master]
2032
2033                if self.fedkit:
2034                    tclcmd.append('-k')
2035
2036                if self.gatewaykit:
2037                    tclcmd.append('-K')
2038
2039                tclcmd.extend([pid, gid, eid, tclfile])
2040
2041                self.log.debug("running local splitter %s", " ".join(tclcmd))
2042                # This is just fantastic.  As a side effect the parser copies
2043                # tb_compat.tcl into the current directory, so that directory
2044                # must be writable by the fedd user.  Doing this in the
2045                # temporary subdir ensures this is the case.
2046                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2047                        cwd=tmpdir)
2048                split_data = tclparser.stdout
2049
2050            top = topdl.topology_from_xml(file=split_data, top="experiment")
2051
2052            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2053             # Find the testbeds to look up
2054            testbeds = set([ a.value for e in top.elements \
2055                    for a in e.attribute \
2056                    if a.attribute == 'testbed'] )
2057
2058            allocated = { }         # Testbeds we can access
2059            topo ={ }               # Sub topologies
2060            connInfo = { }          # Connection information
2061            services = [ ]
2062            self.get_access_to_testbeds(testbeds, access_user, 
2063                    export_project, master, allocated, tbparams, services)
2064            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2065
2066            # Copy configuration files into the remote file store
2067            # The config urlpath
2068            configpath = "/%s/config" % expid
2069            # The config file system location
2070            configdir ="%s%s" % ( self.repodir, configpath)
2071            try:
2072                os.makedirs(configdir)
2073            except IOError, e:
2074                raise service_error(
2075                        "Cannot create config directory: %s" % e)
2076            try:
2077                f = open("%s/hosts" % configdir, "w")
2078                f.write('\n'.join(hosts))
2079                f.close()
2080            except IOError, e:
2081                raise service_error(service_error.internal, 
2082                        "Cannot write hosts file: %s" % e)
2083            try:
2084                copy_file("%s" % gw_pubkey, "%s/%s" % \
2085                        (configdir, gw_pubkey_base))
2086                copy_file("%s" % gw_secretkey, "%s/%s" % \
2087                        (configdir, gw_secretkey_base))
2088            except IOError, e:
2089                raise service_error(service_error.internal, 
2090                        "Cannot copy keyfiles: %s" % e)
2091
2092            # Allow the individual testbeds to access the configuration files.
2093            for tb in tbparams.keys():
2094                asignee = tbparams[tb]['allocID']['fedid']
2095                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2096                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2097
2098            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
2099                    connInfo)
2100            # Now get access to the dynamic testbeds
2101            for k, t in topo.items():
2102                if not t.get_attribute('dynamic'):
2103                    continue
2104                tb = t.get_attribute('testbed')
2105                if tb: 
2106                    self.get_access(tb, None, user, tbparams, master, 
2107                            export_project, access_user, services)
2108                    tbparams[k] = tbparams[tb]
2109                    del tbparams[tb]
2110                    allocated[k] = 1
2111                else:
2112                    raise service_error(service_error.internal, 
2113                            "Dynamic allocation from no testbed!?")
2114
2115            self.wrangle_software(expid, top, topo, tbparams)
2116
2117            vtopo = topdl.topology_to_vtopo(top)
2118            vis = self.genviz(vtopo)
2119
2120            # save federant information
2121            for k in allocated.keys():
2122                tbparams[k]['federant'] = {
2123                        'name': [ { 'localname' : eid} ],
2124                        'allocID' : tbparams[k]['allocID'],
2125                        'master' : k == master,
2126                        'uri': tbparams[k]['uri'],
2127                    }
2128                if tbparams[k].has_key('emulab'):
2129                        tbparams[k]['federant']['emulab'] = \
2130                                tbparams[k]['emulab']
2131
2132            self.state_lock.acquire()
2133            self.state[eid]['vtopo'] = vtopo
2134            self.state[eid]['vis'] = vis
2135            self.state[expid]['federant'] = \
2136                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2137                        if tbparams[tb].has_key('federant') ]
2138            if self.state_filename: 
2139                self.write_state()
2140            self.state_lock.release()
2141        except service_error, e:
2142            # If something goes wrong in the parse (usually an access error)
2143            # clear the placeholder state.  From here on out the code delays
2144            # exceptions.  Failing at this point returns a fault to the remote
2145            # caller.
2146
2147            self.state_lock.acquire()
2148            del self.state[eid]
2149            del self.state[expid]
2150            if self.state_filename: self.write_state()
2151            self.state_lock.release()
2152            raise e
2153
2154
2155        # Start the background swapper and return the starting state.  From
2156        # here on out, the state will stick around a while.
2157
2158        # Let users touch the state
2159        self.auth.set_attribute(fid, expid)
2160        self.auth.set_attribute(expid, expid)
2161        # Override fedids can manipulate state as well
2162        for o in self.overrides:
2163            self.auth.set_attribute(o, expid)
2164
2165        # Create a logger that logs to the experiment's state object as well as
2166        # to the main log file.
2167        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2168        alloc_collector = self.list_log(self.state[eid]['log'])
2169        h = logging.StreamHandler(alloc_collector)
2170        # XXX: there should be a global one of these rather than repeating the
2171        # code.
2172        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2173                    '%d %b %y %H:%M:%S'))
2174        alloc_log.addHandler(h)
2175       
2176        attrs = [ 
2177                {
2178                    'attribute': 'ssh_pubkey', 
2179                    'value': '%s/%s/config/%s' % \
2180                            (self.repo_url, expid, gw_pubkey_base)
2181                },
2182                {
2183                    'attribute': 'ssh_secretkey', 
2184                    'value': '%s/%s/config/%s' % \
2185                            (self.repo_url, expid, gw_secretkey_base)
2186                },
2187                {
2188                    'attribute': 'hosts', 
2189                    'value': '%s/%s/config/hosts' % \
2190                            (self.repo_url, expid)
2191                },
2192                {
2193                    'attribute': 'experiment_name',
2194                    'value': eid,
2195                },
2196            ]
2197
2198        # Start a thread to do the resource allocation
2199        t  = Thread(target=self.allocate_resources,
2200                args=(allocated, master, eid, expid, tbparams, 
2201                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2202                    services),
2203                name=eid)
2204        t.start()
2205
2206        rv = {
2207                'experimentID': [
2208                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2209                ],
2210                'experimentStatus': 'starting',
2211            }
2212
2213        return rv
2214   
2215    def get_experiment_fedid(self, key):
2216        """
2217        find the fedid associated with the localname key in the state database.
2218        """
2219
2220        rv = None
2221        self.state_lock.acquire()
2222        if self.state.has_key(key):
2223            if isinstance(self.state[key], dict):
2224                try:
2225                    kl = [ f['fedid'] for f in \
2226                            self.state[key]['experimentID']\
2227                                if f.has_key('fedid') ]
2228                except KeyError:
2229                    self.state_lock.release()
2230                    raise service_error(service_error.internal, 
2231                            "No fedid for experiment %s when getting "+\
2232                                    "fedid(!?)" % key)
2233                if len(kl) == 1:
2234                    rv = kl[0]
2235                else:
2236                    self.state_lock.release()
2237                    raise service_error(service_error.internal, 
2238                            "multiple fedids for experiment %s when " +\
2239                                    "getting fedid(!?)" % key)
2240            else:
2241                self.state_lock.release()
2242                raise service_error(service_error.internal, 
2243                        "Unexpected state for %s" % key)
2244        self.state_lock.release()
2245        return rv
2246
2247    def check_experiment_access(self, fid, key):
2248        """
2249        Confirm that the fid has access to the experiment.  Though a request
2250        may be made in terms of a local name, the access attribute is always
2251        the experiment's fedid.
2252        """
2253        if not isinstance(key, fedid):
2254            key = self.get_experiment_fedid(key)
2255
2256        if self.auth.check_attribute(fid, key):
2257            return True
2258        else:
2259            raise service_error(service_error.access, "Access Denied")
2260
2261
2262    def get_handler(self, path, fid):
2263        self.log.info("Get handler %s %s" % (path, fid))
2264        if self.auth.check_attribute(fid, path):
2265            return ("%s/%s" % (self.repodir, path), "application/binary")
2266        else:
2267            return (None, None)
2268
2269    def get_vtopo(self, req, fid):
2270        """
2271        Return the stored virtual topology for this experiment
2272        """
2273        rv = None
2274        state = None
2275
2276        req = req.get('VtopoRequestBody', None)
2277        if not req:
2278            raise service_error(service_error.req,
2279                    "Bad request format (no VtopoRequestBody)")
2280        exp = req.get('experiment', None)
2281        if exp:
2282            if exp.has_key('fedid'):
2283                key = exp['fedid']
2284                keytype = "fedid"
2285            elif exp.has_key('localname'):
2286                key = exp['localname']
2287                keytype = "localname"
2288            else:
2289                raise service_error(service_error.req, "Unknown lookup type")
2290        else:
2291            raise service_error(service_error.req, "No request?")
2292
2293        self.check_experiment_access(fid, key)
2294
2295        self.state_lock.acquire()
2296        if self.state.has_key(key):
2297            if self.state[key].has_key('vtopo'):
2298                rv = { 'experiment' : {keytype: key },\
2299                        'vtopo': self.state[key]['vtopo'],\
2300                    }
2301            else:
2302                state = self.state[key]['experimentStatus']
2303        self.state_lock.release()
2304
2305        if rv: return rv
2306        else: 
2307            if state:
2308                raise service_error(service_error.partial, 
2309                        "Not ready: %s" % state)
2310            else:
2311                raise service_error(service_error.req, "No such experiment")
2312
2313    def get_vis(self, req, fid):
2314        """
2315        Return the stored visualization for this experiment
2316        """
2317        rv = None
2318        state = None
2319
2320        req = req.get('VisRequestBody', None)
2321        if not req:
2322            raise service_error(service_error.req,
2323                    "Bad request format (no VisRequestBody)")
2324        exp = req.get('experiment', None)
2325        if exp:
2326            if exp.has_key('fedid'):
2327                key = exp['fedid']
2328                keytype = "fedid"
2329            elif exp.has_key('localname'):
2330                key = exp['localname']
2331                keytype = "localname"
2332            else:
2333                raise service_error(service_error.req, "Unknown lookup type")
2334        else:
2335            raise service_error(service_error.req, "No request?")
2336
2337        self.check_experiment_access(fid, key)
2338
2339        self.state_lock.acquire()
2340        if self.state.has_key(key):
2341            if self.state[key].has_key('vis'):
2342                rv =  { 'experiment' : {keytype: key },\
2343                        'vis': self.state[key]['vis'],\
2344                        }
2345            else:
2346                state = self.state[key]['experimentStatus']
2347        self.state_lock.release()
2348
2349        if rv: return rv
2350        else:
2351            if state:
2352                raise service_error(service_error.partial, 
2353                        "Not ready: %s" % state)
2354            else:
2355                raise service_error(service_error.req, "No such experiment")
2356
2357    def clean_info_response(self, rv):
2358        """
2359        Remove the information in the experiment's state object that is not in
2360        the info response.
2361        """
2362        # Remove the owner info (should always be there, but...)
2363        if rv.has_key('owner'): del rv['owner']
2364
2365        # Convert the log into the allocationLog parameter and remove the
2366        # log entry (with defensive programming)
2367        if rv.has_key('log'):
2368            rv['allocationLog'] = "".join(rv['log'])
2369            del rv['log']
2370        else:
2371            rv['allocationLog'] = ""
2372
2373        if rv['experimentStatus'] != 'active':
2374            if rv.has_key('federant'): del rv['federant']
2375        else:
2376            # remove the allocationID and uri info from each federant
2377            for f in rv.get('federant', []):
2378                if f.has_key('allocID'): del f['allocID']
2379                if f.has_key('uri'): del f['uri']
2380        return rv
2381
2382    def get_info(self, req, fid):
2383        """
2384        Return all the stored info about this experiment
2385        """
2386        rv = None
2387
2388        req = req.get('InfoRequestBody', None)
2389        if not req:
2390            raise service_error(service_error.req,
2391                    "Bad request format (no InfoRequestBody)")
2392        exp = req.get('experiment', None)
2393        if exp:
2394            if exp.has_key('fedid'):
2395                key = exp['fedid']
2396                keytype = "fedid"
2397            elif exp.has_key('localname'):
2398                key = exp['localname']
2399                keytype = "localname"
2400            else:
2401                raise service_error(service_error.req, "Unknown lookup type")
2402        else:
2403            raise service_error(service_error.req, "No request?")
2404
2405        self.check_experiment_access(fid, key)
2406
2407        # The state may be massaged by the service function that called
2408        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2409        # state.
2410        self.state_lock.acquire()
2411        if self.state.has_key(key):
2412            rv = copy.deepcopy(self.state[key])
2413        self.state_lock.release()
2414
2415        if rv:
2416            return self.clean_info_response(rv)
2417        else:
2418            raise service_error(service_error.req, "No such experiment")
2419
2420    def get_multi_info(self, req, fid):
2421        """
2422        Return all the stored info that this fedid can access
2423        """
2424        rv = { 'info': [ ] }
2425
2426        self.state_lock.acquire()
2427        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2428            try:
2429                self.check_experiment_access(fid, key)
2430            except service_error, e:
2431                if e.code == service_error.access:
2432                    continue
2433                else:
2434                    self.state_lock.release()
2435                    raise e
2436
2437            if self.state.has_key(key):
2438                e = copy.deepcopy(self.state[key])
2439                e = self.clean_info_response(e)
2440                rv['info'].append(e)
2441        self.state_lock.release()
2442        return rv
2443
2444    def terminate_experiment(self, req, fid):
2445        """
2446        Swap this experiment out on the federants and delete the shared
2447        information
2448        """
2449        tbparams = { }
2450        req = req.get('TerminateRequestBody', None)
2451        if not req:
2452            raise service_error(service_error.req,
2453                    "Bad request format (no TerminateRequestBody)")
2454        force = req.get('force', False)
2455        exp = req.get('experiment', None)
2456        if exp:
2457            if exp.has_key('fedid'):
2458                key = exp['fedid']
2459                keytype = "fedid"
2460            elif exp.has_key('localname'):
2461                key = exp['localname']
2462                keytype = "localname"
2463            else:
2464                raise service_error(service_error.req, "Unknown lookup type")
2465        else:
2466            raise service_error(service_error.req, "No request?")
2467
2468        self.check_experiment_access(fid, key)
2469
2470        dealloc_list = [ ]
2471
2472
2473        # Create a logger that logs to the dealloc_list as well as to the main
2474        # log file.
2475        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2476        h = logging.StreamHandler(self.list_log(dealloc_list))
2477        # XXX: there should be a global one of these rather than repeating the
2478        # code.
2479        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2480                    '%d %b %y %H:%M:%S'))
2481        dealloc_log.addHandler(h)
2482
2483        self.state_lock.acquire()
2484        fed_exp = self.state.get(key, None)
2485
2486        if fed_exp:
2487            # This branch of the conditional holds the lock to generate a
2488            # consistent temporary tbparams variable to deallocate experiments.
2489            # It releases the lock to do the deallocations and reacquires it to
2490            # remove the experiment state when the termination is complete.
2491
2492            # First make sure that the experiment creation is complete.
2493            status = fed_exp.get('experimentStatus', None)
2494
2495            if status:
2496                if status in ('starting', 'terminating'):
2497                    if not force:
2498                        self.state_lock.release()
2499                        raise service_error(service_error.partial, 
2500                                'Experiment still being created or destroyed')
2501                    else:
2502                        self.log.warning('Experiment in %s state ' % status + \
2503                                'being terminated by force.')
2504            else:
2505                # No status??? trouble
2506                self.state_lock.release()
2507                raise service_error(service_error.internal,
2508                        "Experiment has no status!?")
2509
2510            ids = []
2511            #  experimentID is a list of dicts that are self-describing
2512            #  identifiers.  This finds all the fedids and localnames - the
2513            #  keys of self.state - and puts them into ids.
2514            for id in fed_exp.get('experimentID', []):
2515                if id.has_key('fedid'): ids.append(id['fedid'])
2516                if id.has_key('localname'): ids.append(id['localname'])
2517
2518            # Collect the allocation/segment ids into a dict keyed by the fedid
2519            # of the allocation (or a monotonically increasing integer) that
2520            # contains a tuple of uri, aid (which is a dict...)
2521            for i, fed in enumerate(fed_exp.get('federant', [])):
2522                try:
2523                    uri = fed['uri']
2524                    aid = fed['allocID']
2525                    k = fed['allocID'].get('fedid', i)
2526                except KeyError, e:
2527                    continue
2528                tbparams[k] = (uri, aid)
2529            fed_exp['experimentStatus'] = 'terminating'
2530            if self.state_filename: self.write_state()
2531            self.state_lock.release()
2532
2533            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2534            # then completes, so we can't wait if nothing starts.  So, no
2535            # tbparams, no start.
2536            if len(tbparams) > 0:
2537                thread_pool = self.thread_pool(self.nthreads)
2538                for k in tbparams.keys():
2539                    # Create and start a thread to stop the segment
2540                    thread_pool.wait_for_slot()
2541                    uri, aid = tbparams[k]
2542                    t  = self.pooled_thread(\
2543                            target=self.terminate_segment(log=dealloc_log,
2544                                testbed=uri,
2545                                cert_file=self.cert_file, 
2546                                cert_pwd=self.cert_pwd,
2547                                trusted_certs=self.trusted_certs,
2548                                caller=self.call_TerminateSegment),
2549                            args=(uri, aid), name=k,
2550                            pdata=thread_pool, trace_file=self.trace_file)
2551                    t.start()
2552                # Wait for completions
2553                thread_pool.wait_for_all_done()
2554
2555            # release the allocations (failed experiments have done this
2556            # already, and starting experiments may be in odd states, so we
2557            # ignore errors releasing those allocations
2558            try: 
2559                for k in tbparams.keys():
2560                    # This releases access by uri
2561                    uri, aid = tbparams[k]
2562                    self.release_access(None, aid, uri=uri)
2563            except service_error, e:
2564                if status != 'failed' and not force:
2565                    raise e
2566
2567            # Remove the terminated experiment
2568            self.state_lock.acquire()
2569            for id in ids:
2570                if self.state.has_key(id): del self.state[id]
2571
2572            if self.state_filename: self.write_state()
2573            self.state_lock.release()
2574
2575            return { 
2576                    'experiment': exp , 
2577                    'deallocationLog': "".join(dealloc_list),
2578                    }
2579        else:
2580            # Don't forget to release the lock
2581            self.state_lock.release()
2582            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.