source: fedd/federation/experiment_control.py @ b4624b2

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since b4624b2 was 641bb66, checked in by Ted Faber <faber@…>, 15 years ago

Remove one instance of experiment controller setting startcmds. The others
need to go as well.

  • Property mode set to 100644
File size: 87.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221        self.repo_url = config.get("experiment_control", "repo_url", 
222                "https://users.isi.deterlab.net:23235");
223
224        self.exp_stem = "fed-stem"
225        self.log = logging.getLogger("fedd.experiment_control")
226        set_log_level(config, "experiment_control", self.log)
227        self.muxmax = 2
228        self.nthreads = 2
229        self.randomize_experiments = False
230
231        self.splitter = None
232        self.ssh_keygen = "/usr/bin/ssh-keygen"
233        self.ssh_identity_file = None
234
235
236        self.debug = config.getboolean("experiment_control", "create_debug")
237        self.cleanup = not config.getboolean("experiment_control", 
238                "leave_tmpfiles")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'New': soap_handler('New', self.new_experiment),
337                'Create': soap_handler('Create', self.create_experiment),
338                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
339                'Vis': soap_handler('Vis', self.get_vis),
340                'Info': soap_handler('Info', self.get_info),
341                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
342                'Terminate': soap_handler('Terminate', 
343                    self.terminate_experiment),
344        }
345
346        self.xmlrpc_services = {\
347                'New': xmlrpc_handler('New', self.new_experiment),
348                'Create': xmlrpc_handler('Create', self.create_experiment),
349                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
350                'Vis': xmlrpc_handler('Vis', self.get_vis),
351                'Info': xmlrpc_handler('Info', self.get_info),
352                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
353                'Terminate': xmlrpc_handler('Terminate',
354                    self.terminate_experiment),
355        }
356
357    # Call while holding self.state_lock
358    def write_state(self):
359        """
360        Write a new copy of experiment state after copying the existing state
361        to a backup.
362
363        State format is a simple pickling of the state dictionary.
364        """
365        if os.access(self.state_filename, os.W_OK):
366            copy_file(self.state_filename, \
367                    "%s.bak" % self.state_filename)
368        try:
369            f = open(self.state_filename, 'w')
370            pickle.dump(self.state, f)
371        except IOError, e:
372            self.log.error("Can't write file %s: %s" % \
373                    (self.state_filename, e))
374        except pickle.PicklingError, e:
375            self.log.error("Pickling problem: %s" % e)
376        except TypeError, e:
377            self.log.error("Pickling problem (TypeError): %s" % e)
378
379    # Call while holding self.state_lock
380    def read_state(self):
381        """
382        Read a new copy of experiment state.  Old state is overwritten.
383
384        State format is a simple pickling of the state dictionary.
385        """
386       
387        def get_experiment_id(state):
388            """
389            Pull the fedid experimentID out of the saved state.  This is kind
390            of a gross walk through the dict.
391            """
392
393            if state.has_key('experimentID'):
394                for e in state['experimentID']:
395                    if e.has_key('fedid'):
396                        return e['fedid']
397                else:
398                    return None
399            else:
400                return None
401
402        def get_alloc_ids(state):
403            """
404            Pull the fedids of the identifiers of each allocation from the
405            state.  Again, a dict dive that's best isolated.
406            """
407
408            return [ f['allocID']['fedid'] 
409                    for f in state.get('federant',[]) \
410                        if f.has_key('allocID') and \
411                            f['allocID'].has_key('fedid')]
412
413
414        try:
415            f = open(self.state_filename, "r")
416            self.state = pickle.load(f)
417            self.log.debug("[read_state]: Read state from %s" % \
418                    self.state_filename)
419        except IOError, e:
420            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
421                    % (self.state_filename, e))
422        except pickle.UnpicklingError, e:
423            self.log.warning(("[read_state]: No saved state: " + \
424                    "Unpickling failed: %s") % e)
425       
426        for s in self.state.values():
427            try:
428
429                eid = get_experiment_id(s)
430                if eid : 
431                    # Give the owner rights to the experiment
432                    self.auth.set_attribute(s['owner'], eid)
433                    # And holders of the eid as well
434                    self.auth.set_attribute(eid, eid)
435                    # allow overrides to control experiments as well
436                    for o in self.overrides:
437                        self.auth.set_attribute(o, eid)
438                    # Set permissions to allow reading of the software repo, if
439                    # any, as well.
440                    for a in get_alloc_ids(s):
441                        self.auth.set_attribute(a, 'repo/%s' % eid)
442                else:
443                    raise KeyError("No experiment id")
444            except KeyError, e:
445                self.log.warning("[read_state]: State ownership or identity " +\
446                        "misformatted in %s: %s" % (self.state_filename, e))
447
448
449    def read_accessdb(self, accessdb_file):
450        """
451        Read the mapping from fedids that can create experiments to their name
452        in the 3-level access namespace.  All will be asserted from this
453        testbed and can include the local username and porject that will be
454        asserted on their behalf by this fedd.  Each fedid is also added to the
455        authorization system with the "create" attribute.
456        """
457        self.accessdb = {}
458        # These are the regexps for parsing the db
459        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
460        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
461                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
462        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
463                "\s*->\s*(" + name_expr + ")\s*$")
464        lineno = 0
465
466        # Parse the mappings and store in self.authdb, a dict of
467        # fedid -> (proj, user)
468        try:
469            f = open(accessdb_file, "r")
470            for line in f:
471                lineno += 1
472                line = line.strip()
473                if len(line) == 0 or line.startswith('#'):
474                    continue
475                m = project_line.match(line)
476                if m:
477                    fid = fedid(hexstr=m.group(1))
478                    project, user = m.group(2,3)
479                    if not self.accessdb.has_key(fid):
480                        self.accessdb[fid] = []
481                    self.accessdb[fid].append((project, user))
482                    continue
483
484                m = user_line.match(line)
485                if m:
486                    fid = fedid(hexstr=m.group(1))
487                    project = None
488                    user = m.group(2)
489                    if not self.accessdb.has_key(fid):
490                        self.accessdb[fid] = []
491                    self.accessdb[fid].append((project, user))
492                    continue
493                self.log.warn("[experiment_control] Error parsing access " +\
494                        "db %s at line %d" %  (accessdb_file, lineno))
495        except IOError:
496            raise service_error(service_error.internal,
497                    "Error opening/reading %s as experiment " +\
498                            "control accessdb" %  accessdb_file)
499        f.close()
500
501        # Initialize the authorization attributes
502        for fid in self.accessdb.keys():
503            self.auth.set_attribute(fid, 'create')
504            self.auth.set_attribute(fid, 'new')
505
506    def read_mapdb(self, file):
507        """
508        Read a simple colon separated list of mappings for the
509        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
510        """
511
512        self.tbmap = { }
513        lineno =0
514        try:
515            f = open(file, "r")
516            for line in f:
517                lineno += 1
518                line = line.strip()
519                if line.startswith('#') or len(line) == 0:
520                    continue
521                try:
522                    label, url = line.split(':', 1)
523                    self.tbmap[label] = url
524                except ValueError, e:
525                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
526                            "map db: %s %s" % (lineno, line, e))
527        except IOError, e:
528            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
529                    "open %s: %s" % (file, e))
530        f.close()
531       
532    def generate_ssh_keys(self, dest, type="rsa" ):
533        """
534        Generate a set of keys for the gateways to use to talk.
535
536        Keys are of type type and are stored in the required dest file.
537        """
538        valid_types = ("rsa", "dsa")
539        t = type.lower();
540        if t not in valid_types: raise ValueError
541        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
542
543        try:
544            trace = open("/dev/null", "w")
545        except IOError:
546            raise service_error(service_error.internal,
547                    "Cannot open /dev/null??");
548
549        # May raise CalledProcessError
550        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
551        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
552        if rv != 0:
553            raise service_error(service_error.internal, 
554                    "Cannot generate nonce ssh keys.  %s return code %d" \
555                            % (self.ssh_keygen, rv))
556
557    def gentopo(self, str):
558        """
559        Generate the topology dtat structure from the splitter's XML
560        representation of it.
561
562        The topology XML looks like:
563            <experiment>
564                <nodes>
565                    <node><vname></vname><ips>ip1:ip2</ips></node>
566                </nodes>
567                <lans>
568                    <lan>
569                        <vname></vname><vnode></vnode><ip></ip>
570                        <bandwidth></bandwidth><member>node:port</member>
571                    </lan>
572                </lans>
573        """
574        class topo_parse:
575            """
576            Parse the topology XML and create the dats structure.
577            """
578            def __init__(self):
579                # Typing of the subelements for data conversion
580                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
581                self.int_subelements = ( 'bandwidth',)
582                self.float_subelements = ( 'delay',)
583                # The final data structure
584                self.nodes = [ ]
585                self.lans =  [ ]
586                self.topo = { \
587                        'node': self.nodes,\
588                        'lan' : self.lans,\
589                    }
590                self.element = { }  # Current element being created
591                self.chars = ""     # Last text seen
592
593            def end_element(self, name):
594                # After each sub element the contents is added to the current
595                # element or to the appropriate list.
596                if name == 'node':
597                    self.nodes.append(self.element)
598                    self.element = { }
599                elif name == 'lan':
600                    self.lans.append(self.element)
601                    self.element = { }
602                elif name in self.str_subelements:
603                    self.element[name] = self.chars
604                    self.chars = ""
605                elif name in self.int_subelements:
606                    self.element[name] = int(self.chars)
607                    self.chars = ""
608                elif name in self.float_subelements:
609                    self.element[name] = float(self.chars)
610                    self.chars = ""
611
612            def found_chars(self, data):
613                self.chars += data.rstrip()
614
615
616        tp = topo_parse();
617        parser = xml.parsers.expat.ParserCreate()
618        parser.EndElementHandler = tp.end_element
619        parser.CharacterDataHandler = tp.found_chars
620
621        parser.Parse(str)
622
623        return tp.topo
624       
625
626    def genviz(self, topo):
627        """
628        Generate the visualization the virtual topology
629        """
630
631        neato = "/usr/local/bin/neato"
632        # These are used to parse neato output and to create the visualization
633        # file.
634        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
635        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
636                "%s</type></node>"
637
638        try:
639            # Node names
640            nodes = [ n['vname'] for n in topo['node'] ]
641            topo_lans = topo['lan']
642        except KeyError, e:
643            raise service_error(service_error.internal, "Bad topology: %s" %e)
644
645        lans = { }
646        links = { }
647
648        # Walk through the virtual topology, organizing the connections into
649        # 2-node connections (links) and more-than-2-node connections (lans).
650        # When a lan is created, it's added to the list of nodes (there's a
651        # node in the visualization for the lan).
652        for l in topo_lans:
653            if links.has_key(l['vname']):
654                if len(links[l['vname']]) < 2:
655                    links[l['vname']].append(l['vnode'])
656                else:
657                    nodes.append(l['vname'])
658                    lans[l['vname']] = links[l['vname']]
659                    del links[l['vname']]
660                    lans[l['vname']].append(l['vnode'])
661            elif lans.has_key(l['vname']):
662                lans[l['vname']].append(l['vnode'])
663            else:
664                links[l['vname']] = [ l['vnode'] ]
665
666
667        # Open up a temporary file for dot to turn into a visualization
668        try:
669            df, dotname = tempfile.mkstemp()
670            dotfile = os.fdopen(df, 'w')
671        except IOError:
672            raise service_error(service_error.internal,
673                    "Failed to open file in genviz")
674
675        try:
676            dnull = open('/dev/null', 'w')
677        except IOError:
678            service_error(service_error.internal,
679                    "Failed to open /dev/null in genviz")
680
681        # Generate a dot/neato input file from the links, nodes and lans
682        try:
683            print >>dotfile, "graph G {"
684            for n in nodes:
685                print >>dotfile, '\t"%s"' % n
686            for l in links.keys():
687                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
688            for l in lans.keys():
689                for n in lans[l]:
690                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
691            print >>dotfile, "}"
692            dotfile.close()
693        except TypeError:
694            raise service_error(service_error.internal,
695                    "Single endpoint link in vtopo")
696        except IOError:
697            raise service_error(service_error.internal, "Cannot write dot file")
698
699        # Use dot to create a visualization
700        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
701                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
702                close_fds=True)
703        dnull.close()
704
705        # Translate dot to vis format
706        vis_nodes = [ ]
707        vis = { 'node': vis_nodes }
708        for line in dot.stdout:
709            m = vis_re.match(line)
710            if m:
711                vn = m.group(1)
712                vis_node = {'name': vn, \
713                        'x': float(m.group(2)),\
714                        'y' : float(m.group(3)),\
715                    }
716                if vn in links.keys() or vn in lans.keys():
717                    vis_node['type'] = 'lan'
718                else:
719                    vis_node['type'] = 'node'
720                vis_nodes.append(vis_node)
721        rv = dot.wait()
722
723        os.remove(dotname)
724        if rv == 0 : return vis
725        else: return None
726
727    def get_access(self, tb, nodes, tbparam, master, export_project,
728            access_user):
729        """
730        Get access to testbed through fedd and set the parameters for that tb
731        """
732        uri = self.tbmap.get(tb, None)
733        if not uri:
734            raise service_error(serice_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        # Tweak search order so that if there are entries in access_user that
738        # have a project matching the export project, we try them first
739        if export_project and export_project.has_key('localname'): 
740            pn = export_project['localname'] 
741               
742            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
743            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
744        else: 
745            access_sequence = access_user
746
747        for p, u in access_sequence: 
748            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
749                    "to %s") %  ((p or "None"), u, uri))
750
751            if p:
752                # Request with user and project specified
753                req = {\
754                        'destinationTestbed' : { 'uri' : uri },
755                        'credential': [ "project: %s" % p, "user: %s"  % u],
756                        'allocID' : { 'localname': 'test' },
757                    }
758            else:
759                # Request with only user specified
760                req = {\
761                        'destinationTestbed' : { 'uri' : uri },
762                        'credential': [ 'user: %s' % u ],
763                        'user':  [ {'userID': { 'localname': u } } ],
764                        'allocID' : { 'localname': 'test' },
765                    }
766
767            if tb == master:
768                # NB, the export_project parameter is a dict that includes
769                # the type
770                req['exportProject'] = export_project
771
772            # node resources if any
773            if nodes != None and len(nodes) > 0:
774                rnodes = [ ]
775                for n in nodes:
776                    rn = { }
777                    image, hw, count = n.split(":")
778                    if image: rn['image'] = [ image ]
779                    if hw: rn['hardware'] = [ hw ]
780                    if count and int(count) >0 : rn['count'] = int(count)
781                    rnodes.append(rn)
782                req['resources']= { }
783                req['resources']['node'] = rnodes
784
785            try:
786                if self.local_access.has_key(uri):
787                    # Local access call
788                    req = { 'RequestAccessRequestBody' : req }
789                    r = self.local_access[uri].RequestAccess(req, 
790                            fedid(file=self.cert_file))
791                    r = { 'RequestAccessResponseBody' : r }
792                else:
793                    r = self.call_RequestAccess(uri, req, 
794                            self.cert_file, self.cert_pwd, self.trusted_certs)
795            except service_error, e:
796                if e.code == service_error.access:
797                    self.log.debug("[get_access] Access denied")
798                    r = None
799                    continue
800                else:
801                    raise e
802
803            if r.has_key('RequestAccessResponseBody'):
804                # Through to here we have a valid response, not a fault.
805                # Access denied is a fault, so something better or worse than
806                # access denied has happened.
807                r = r['RequestAccessResponseBody']
808                self.log.debug("[get_access] Access granted")
809                break
810            else:
811                raise service_error(service_error.protocol,
812                        "Bad proxy response")
813       
814        if not r:
815            raise service_error(service_error.access, 
816                    "Access denied by %s (%s)" % (tb, uri))
817
818        tbparam[tb] = { 
819                "allocID" : r['allocID'],
820                "uri": uri,
821                }
822
823        # Add attributes to parameter space.  We don't allow attributes to
824        # overlay any parameters already installed.
825        for a in r['fedAttr']:
826            try:
827                if a['attribute'] and \
828                        isinstance(a['attribute'], basestring)\
829                        and not tbparam[tb].has_key(a['attribute'].lower()):
830                    tbparam[tb][a['attribute'].lower()] = a['value']
831            except KeyError:
832                self.log.error("Bad attribute in response: %s" % a)
833
834    def release_access(self, tb, aid, uri=None):
835        """
836        Release access to testbed through fedd
837        """
838
839        if not uri:
840            uri = self.tbmap.get(tb, None)
841        if not uri:
842            raise service_error(service_error.server_config, 
843                    "Unknown testbed: %s" % tb)
844
845        if self.local_access.has_key(uri):
846            resp = self.local_access[uri].ReleaseAccess(\
847                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
848                    fedid(file=self.cert_file))
849            resp = { 'ReleaseAccessResponseBody': resp } 
850        else:
851            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
852                    self.cert_file, self.cert_pwd, self.trusted_certs)
853
854        # better error coding
855
856    def remote_splitter(self, uri, desc, master):
857
858        req = {
859                'description' : { 'ns2description': desc },
860                'master': master,
861                'include_fedkit': bool(self.fedkit),
862                'include_gatewaykit': bool(self.gatewaykit)
863            }
864
865        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
866                self.trusted_certs)
867
868        if r.has_key('Ns2SplitResponseBody'):
869            r = r['Ns2SplitResponseBody']
870            if r.has_key('output'):
871                return r['output'].splitlines()
872            else:
873                raise service_error(service_error.protocol, 
874                        "Bad splitter response (no output)")
875        else:
876            raise service_error(service_error.protocol, "Bad splitter response")
877
878    class start_segment:
879        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
880                cert_pwd=None, trusted_certs=None, caller=None,
881                log_collector=None):
882            self.log = log
883            self.debug = debug
884            self.cert_file = cert_file
885            self.cert_pwd = cert_pwd
886            self.trusted_certs = None
887            self.caller = caller
888            self.testbed = testbed
889            self.log_collector = log_collector
890            self.response = None
891
892        def __call__(self, uri, aid, topo, master, attrs=None):
893            req = {
894                    'allocID': { 'fedid' : aid }, 
895                    'segmentdescription': { 
896                        'topdldescription': topo.to_dict(),
897                    },
898                    'master': master,
899                }
900            if attrs:
901                req['fedAttr'] = attrs
902
903            try:
904                self.log.debug("Calling StartSegment at %s " % uri)
905                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
906                        self.trusted_certs)
907                if r.has_key('StartSegmentResponseBody'):
908                    lval = r['StartSegmentResponseBody'].get('allocationLog',
909                            None)
910                    if lval and self.log_collector:
911                        for line in  lval.splitlines(True):
912                            self.log_collector.write(line)
913                    self.response = r
914                else:
915                    raise service_error(service_error.internal, 
916                            "Bad response!?: %s" %r)
917                return True
918            except service_error, e:
919                self.log.error("Start segment failed on %s: %s" % \
920                        (self.testbed, e))
921                return False
922
923
924
925    class terminate_segment:
926        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
927                cert_pwd=None, trusted_certs=None, caller=None):
928            self.log = log
929            self.debug = debug
930            self.cert_file = cert_file
931            self.cert_pwd = cert_pwd
932            self.trusted_certs = None
933            self.caller = caller
934            self.testbed = testbed
935
936        def __call__(self, uri, aid ):
937            req = {
938                    'allocID': aid , 
939                }
940            try:
941                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
942                        self.trusted_certs)
943                return True
944            except service_error, e:
945                self.log.error("Terminate segment failed on %s: %s" % \
946                        (self.testbed, e))
947                return False
948   
949
950    def allocate_resources(self, allocated, master, eid, expid, 
951            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
952            attrs=None):
953        def get_vlan(r):
954            if r.has_key('StartSegmentResponseBody'):
955                srb = r['StartSegmentResponseBody']
956                if srb.has_key('fedAttr'):
957                    for k, v in [ (a['attribute'], a['value']) \
958                            for a in srb['fedAttr']]:
959                        if k == 'vlan': return v
960            return None
961
962        started = { }           # Testbeds where a sub-experiment started
963                                # successfully
964
965        # XXX
966        fail_soft = False
967
968        slaves = [ k for k in allocated.keys() \
969                if k != master and not topo[k].get_attribute('transit')]
970        transit = [ k for k in allocated.keys() \
971                if topo[k].get_attribute('transit')]
972
973        log = alloc_log or self.log
974
975        thread_pool = self.thread_pool(self.nthreads)
976        threads = [ ]
977
978        for tb in transit:
979            uri = tbparams[tb]['uri']
980            if tbparams[tb].has_key('allocID') and \
981                    tbparams[tb]['allocID'].has_key('fedid'):
982                aid = tbparams[tb]['allocID']['fedid']
983            else:
984                raise service_error(service_error.internal, 
985                        "No alloc id for testbed %s !?" % tb)
986
987            m = re.search('(\d+)', tb)
988            if m:
989                to_repl = "unassigned%s" % m.group(1)
990            else:
991                raise service_error(service_error.internal, 
992                        "Bad dynamic allocation name")
993                break
994
995            ss = self.start_segment(log=log, debug=self.debug, 
996                testbed=tb, cert_file=self.cert_file, 
997                cert_pwd=self.cert_pwd, 
998                trusted_certs=self.trusted_certs,
999                caller=self.call_StartSegment,
1000                log_collector=log_collector)
1001            t = self.pooled_thread(
1002                    target=ss,
1003                    args =(uri, aid, topo[tb], False, attrs),
1004                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1005            threads.append(t)
1006            t.start()
1007            # Wait until the this transit node finishes (keep pinging the log,
1008            # though)
1009
1010            mins = 0
1011            while not thread_pool.wait_for_all_done(60.0):
1012                mins += 1
1013                alloc_log.info("Waiting for transit (it has been %d mins)" \
1014                        % mins)
1015
1016            if t.rv:
1017                vlan = get_vlan(ss.response)
1018                if vlan is not None:
1019                    for k, t in topo.items():
1020                        for e in t.elements:
1021                            for i in e.interface:
1022                                vl = i.get_attribute('dragon_vlan')
1023                                if vl is not None and vl == to_repl:
1024                                    i.set_attribute('dragon_vlan', vlan)
1025            else:
1026                break
1027            thread_pool.clear()
1028
1029
1030        failed = [ t.getName() for t in threads if not t.rv ]
1031
1032        if len(failed) == 0:
1033            for tb in slaves:
1034                # Create and start a thread to start the segment, and save it
1035                # to get the return value later
1036                thread_pool.wait_for_slot()
1037                uri = self.tbmap.get(tb, None)
1038                if not uri:
1039                    raise service_error(service_error.internal, 
1040                            "Unknown testbed %s !?" % tb)
1041
1042                if tbparams[tb].has_key('allocID') and \
1043                        tbparams[tb]['allocID'].has_key('fedid'):
1044                    aid = tbparams[tb]['allocID']['fedid']
1045                else:
1046                    raise service_error(service_error.internal, 
1047                            "No alloc id for testbed %s !?" % tb)
1048
1049                t  = self.pooled_thread(\
1050                        target=self.start_segment(log=log, debug=self.debug,
1051                            testbed=tb, cert_file=self.cert_file,
1052                            cert_pwd=self.cert_pwd,
1053                            trusted_certs=self.trusted_certs,
1054                            caller=self.call_StartSegment,
1055                            log_collector=log_collector), 
1056                        args=(uri, aid, topo[tb], False, attrs), name=tb,
1057                        pdata=thread_pool, trace_file=self.trace_file)
1058                threads.append(t)
1059                t.start()
1060
1061            # Wait until all finish (keep pinging the log, though)
1062            mins = 0
1063            while not thread_pool.wait_for_all_done(60.0):
1064                mins += 1
1065                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1066                        % mins)
1067
1068            thread_pool.clear()
1069
1070        # If none failed, start the master
1071        failed = [ t.getName() for t in threads if not t.rv ]
1072
1073        if len(failed) == 0:
1074            uri = self.tbmap.get(master, None)
1075            if not uri:
1076                raise service_error(service_error.internal, 
1077                        "Unknown testbed %s !?" % master)
1078
1079            if tbparams[master].has_key('allocID') and \
1080                    tbparams[master]['allocID'].has_key('fedid'):
1081                aid = tbparams[master]['allocID']['fedid']
1082            else:
1083                raise service_error(service_error.internal, 
1084                    "No alloc id for testbed %s !?" % master)
1085            t = self.pooled_thread(
1086                    target=self.start_segment(log=log, debug=self.debug, 
1087                        testbed=master, cert_file=self.cert_file, 
1088                        cert_pwd=self.cert_pwd, 
1089                        trusted_certs=self.trusted_certs,
1090                        caller=self.call_StartSegment,
1091                        log_collector=log_collector),
1092                    args =(uri, aid, topo[master], True, attrs),
1093                    name=master, pdata=thread_pool, trace_file=self.trace_file)
1094            threads.append(t)
1095            t.start()
1096            # Wait until the master finishes (keep pinging the log, though)
1097            mins = 0
1098            while not thread_pool.wait_for_all_done(60.0):
1099                mins += 1
1100                alloc_log.info("Waiting for master (it has been %d mins)" \
1101                        % mins)
1102            # update failed to include the master, if it failed
1103            failed = [ t.getName() for t in threads if not t.rv ]
1104
1105        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1106        # If one failed clean up, unless fail_soft is set
1107        if failed:
1108            if not fail_soft:
1109                thread_pool.clear()
1110                for tb in succeeded:
1111                    # Create and start a thread to stop the segment
1112                    thread_pool.wait_for_slot()
1113                    uri = tbparams[tb]['uri']
1114                    t  = self.pooled_thread(\
1115                            target=self.terminate_segment(log=log,
1116                                testbed=tb,
1117                                cert_file=self.cert_file, 
1118                                cert_pwd=self.cert_pwd,
1119                                trusted_certs=self.trusted_certs,
1120                                caller=self.call_TerminateSegment),
1121                            args=(uri, tbparams[tb]['federant']['allocID']),
1122                            name=tb,
1123                            pdata=thread_pool, trace_file=self.trace_file)
1124                    t.start()
1125                # Wait until all finish
1126                thread_pool.wait_for_all_done()
1127
1128                # release the allocations
1129                for tb in tbparams.keys():
1130                    self.release_access(tb, tbparams[tb]['allocID'],
1131                            tbparams[tb].get('uri', None))
1132                # Remove the placeholder
1133                self.state_lock.acquire()
1134                self.state[eid]['experimentStatus'] = 'failed'
1135                if self.state_filename: self.write_state()
1136                self.state_lock.release()
1137
1138                log.error("Swap in failed on %s" % ",".join(failed))
1139                return
1140        else:
1141            log.info("[start_segment]: Experiment %s active" % eid)
1142
1143
1144        # Walk up tmpdir, deleting as we go
1145        if self.cleanup:
1146            log.debug("[start_experiment]: removing %s" % tmpdir)
1147            for path, dirs, files in os.walk(tmpdir, topdown=False):
1148                for f in files:
1149                    os.remove(os.path.join(path, f))
1150                for d in dirs:
1151                    os.rmdir(os.path.join(path, d))
1152            os.rmdir(tmpdir)
1153        else:
1154            log.debug("[start_experiment]: not removing %s" % tmpdir)
1155
1156        # Insert the experiment into our state and update the disk copy
1157        self.state_lock.acquire()
1158        self.state[expid]['experimentStatus'] = 'active'
1159        self.state[eid] = self.state[expid]
1160        if self.state_filename: self.write_state()
1161        self.state_lock.release()
1162        return
1163
1164
1165    def add_kit(self, e, kit):
1166        """
1167        Add a Software object created from the list of (install, location)
1168        tuples passed as kit  to the software attribute of an object e.  We
1169        do this enough to break out the code, but it's kind of a hack to
1170        avoid changing the old tuple rep.
1171        """
1172
1173        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1174
1175        if isinstance(e.software, list): e.software.extend(s)
1176        else: e.software = s
1177
1178
1179    def create_experiment_state(self, fid, req, expid, expcert, 
1180            state='starting'):
1181        """
1182        Create the initial entry in the experiment's state.  The expid and
1183        expcert are the experiment's fedid and certifacte that represents that
1184        ID, which are installed in the experiment state.  If the request
1185        includes a suggested local name that is used if possible.  If the local
1186        name is already taken by an experiment owned by this user that has
1187        failed, it is overwritten.  Otherwise new letters are added until a
1188        valid localname is found.  The generated local name is returned.
1189        """
1190
1191        if req.has_key('experimentID') and \
1192                req['experimentID'].has_key('localname'):
1193            overwrite = False
1194            eid = req['experimentID']['localname']
1195            # If there's an old failed experiment here with the same local name
1196            # and accessible by this user, we'll overwrite it, otherwise we'll
1197            # fall through and do the collision avoidance.
1198            old_expid = self.get_experiment_fedid(eid)
1199            if old_expid and self.check_experiment_access(fid, old_expid):
1200                self.state_lock.acquire()
1201                status = self.state[eid].get('experimentStatus', None)
1202                if status and status == 'failed':
1203                    # remove the old access attribute
1204                    self.auth.unset_attribute(fid, old_expid)
1205                    overwrite = True
1206                    del self.state[eid]
1207                    del self.state[old_expid]
1208                self.state_lock.release()
1209            self.state_lock.acquire()
1210            while (self.state.has_key(eid) and not overwrite):
1211                eid += random.choice(string.ascii_letters)
1212            # Initial state
1213            self.state[eid] = {
1214                    'experimentID' : \
1215                            [ { 'localname' : eid }, {'fedid': expid } ],
1216                    'experimentStatus': state,
1217                    'experimentAccess': { 'X509' : expcert },
1218                    'owner': fid,
1219                    'log' : [],
1220                }
1221            self.state[expid] = self.state[eid]
1222            if self.state_filename: self.write_state()
1223            self.state_lock.release()
1224        else:
1225            eid = self.exp_stem
1226            for i in range(0,5):
1227                eid += random.choice(string.ascii_letters)
1228            self.state_lock.acquire()
1229            while (self.state.has_key(eid)):
1230                eid = self.exp_stem
1231                for i in range(0,5):
1232                    eid += random.choice(string.ascii_letters)
1233            # Initial state
1234            self.state[eid] = {
1235                    'experimentID' : \
1236                            [ { 'localname' : eid }, {'fedid': expid } ],
1237                    'experimentStatus': state,
1238                    'experimentAccess': { 'X509' : expcert },
1239                    'owner': fid,
1240                    'log' : [],
1241                }
1242            self.state[expid] = self.state[eid]
1243            if self.state_filename: self.write_state()
1244            self.state_lock.release()
1245
1246        return eid
1247
1248
1249    def allocate_ips_to_topo(self, top):
1250        """
1251        Add an ip4_address attribute to all the hosts in the topology, based on
1252        the shared substrates on which they sit.  An /etc/hosts file is also
1253        created and returned as a list of hostfiles entries.  We also return
1254        the allocator, because we may need to allocate IPs to portals
1255        (specifically DRAGON portals).
1256        """
1257        subs = sorted(top.substrates, 
1258                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1259                reverse=True)
1260        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1261        ifs = { }
1262        hosts = [ ]
1263
1264        for idx, s in enumerate(subs):
1265            a = ips.allocate(len(s.interfaces)+2)
1266            if a :
1267                base, num = a
1268                if num < len(s.interfaces) +2 : 
1269                    raise service_error(service_error.internal,
1270                            "Allocator returned wrong number of IPs??")
1271            else:
1272                raise service_error(service_error.req, 
1273                        "Cannot allocate IP addresses")
1274
1275            base += 1
1276            for i in s.interfaces:
1277                i.attribute.append(
1278                        topdl.Attribute('ip4_address', 
1279                            "%s" % ip_addr(base)))
1280                hname = i.element.name[0]
1281                if ifs.has_key(hname):
1282                    hosts.append("%s\t%s-%s %s-%d" % \
1283                            (ip_addr(base), hname, s.name, hname,
1284                                ifs[hname]))
1285                else:
1286                    ifs[hname] = 0
1287                    hosts.append("%s\t%s-%s %s-%d %s" % \
1288                            (ip_addr(base), hname, s.name, hname,
1289                                ifs[hname], hname))
1290
1291                ifs[hname] += 1
1292                base += 1
1293        return hosts, ips
1294
1295    def get_access_to_testbeds(self, testbeds, access_user, 
1296            export_project, master, allocated, tbparams):
1297        """
1298        Request access to the various testbeds required for this instantiation
1299        (passed in as testbeds).  User, access_user, expoert_project and master
1300        are used to construct the correct requests.  Per-testbed parameters are
1301        returned in tbparams.
1302        """
1303        for tb in testbeds:
1304            self.get_access(tb, None, tbparams, master,
1305                    export_project, access_user)
1306            allocated[tb] = 1
1307
1308    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1309        """
1310        Create the sub-topologies that are needed for experimetn instantiation.
1311        Along the way attach startup commands to the computers in the
1312        subtopologies.
1313        """
1314        for tb in testbeds:
1315            topo[tb] = top.clone()
1316            to_delete = [ ]
1317            for e in topo[tb].elements:
1318                etb = e.get_attribute('testbed')
1319                if etb and etb != tb:
1320                    for i in e.interface:
1321                        for s in i.subs:
1322                            try:
1323                                s.interfaces.remove(i)
1324                            except ValueError:
1325                                raise service_error(service_error.internal,
1326                                        "Can't remove interface??")
1327                    to_delete.append(e)
1328            for e in to_delete:
1329                topo[tb].elements.remove(e)
1330            topo[tb].make_indices()
1331
1332            for e in [ e for e in topo[tb].elements \
1333                    if isinstance(e,topdl.Computer)]:
1334                if tb == master:
1335                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1336                else:
1337                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1338                scmd = e.get_attribute('startup')
1339                if scmd:
1340                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1341
1342                e.set_attribute('startup', cmd)
1343                if self.fedkit: self.add_kit(e, self.fedkit)
1344
1345    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1346            portal_type, iface_desc=()):
1347        sproject = tbparams[st].get('project', 'project')
1348        dproject = tbparams[dt].get('project', 'project')
1349        mproject = tbparams[master].get('project', 'project')
1350        sdomain = tbparams[st].get('domain', ".example.com")
1351        ddomain = tbparams[dt].get('domain', ".example.com")
1352        mdomain = tbparams[master].get('domain', '.example.com')
1353        muser = tbparams[master].get('user', 'root')
1354        smbshare = tbparams[master].get('smbshare', 'USERS')
1355        aid = tbparams[dt]['allocID']['fedid']
1356        if st == master or dt == master:
1357            active = ("%s" % (st == master))
1358        else:
1359            active = ("%s" %(st > dt))
1360
1361        ifaces = [ ]
1362        for sub, attrs in iface_desc:
1363            inf = topdl.Interface(
1364                    substrate=sub,
1365                    attribute=[
1366                        topdl.Attribute(
1367                            attribute=n,
1368                            value = v)
1369                        for n, v in attrs
1370                        ]
1371                    )
1372            ifaces.append(inf)
1373        return topdl.Computer(
1374                name=myname,
1375                attribute=[ 
1376                    topdl.Attribute(attribute=n,value=v)
1377                        for n, v in (\
1378                            ('portal', 'true'),
1379                            ('domain', sdomain),
1380                            ('masterdomain', mdomain),
1381                            ('masterexperiment', "%s/%s" % \
1382                                    (mproject, eid)),
1383                            ('masteruser', muser),
1384                            ('smbshare', smbshare),
1385                            ('experiment', "%s/%s" % \
1386                                    (sproject, eid)),
1387                            ('peer', "%s" % desthost),
1388                            ('peer_segment', "%s" % aid),
1389                            ('scriptdir', 
1390                                "/usr/local/federation/bin"),
1391                            ('active', "%s" % active),
1392                            ('portal_type', portal_type), 
1393                        )
1394                    ],
1395                interface=ifaces,
1396                )
1397
1398    def new_portal_substrate(self, st, dt, eid, tbparams):
1399        ddomain = tbparams[dt].get('domain', ".example.com")
1400        dproject = tbparams[dt].get('project', 'project')
1401        tsubstrate = \
1402                topdl.Substrate(name='%s-%s' % (st, dt),
1403                        attribute= [
1404                            topdl.Attribute(
1405                                attribute='portal',
1406                                value='true')
1407                            ]
1408                        )
1409        segment_element = topdl.Segment(
1410                id= tbparams[dt]['allocID'],
1411                type='emulab',
1412                uri = self.tbmap.get(dt, None),
1413                interface=[ 
1414                    topdl.Interface(
1415                        substrate=tsubstrate.name),
1416                    ],
1417                attribute = [
1418                    topdl.Attribute(attribute=n, value=v)
1419                        for n, v in (\
1420                            ('domain', ddomain),
1421                            ('experiment', "%s/%s" % \
1422                                    (dproject, eid)),)
1423                    ],
1424                )
1425
1426        return (tsubstrate, segment_element)
1427
1428    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
1429        if sub.capacity is None:
1430            raise service_error(service_error.internal,
1431                    "Cannot DRAGON split substrate w/o capacity")
1432        segs = [ ]
1433        substr = topdl.Substrate(name="dragon%d" % idx, 
1434                capacity=sub.capacity.clone(),
1435                attribute=[ topdl.Attribute(attribute=n, value=v)
1436                    for n, v, in (\
1437                            ('vlan', 'unassigned%d' % idx),)])
1438        for tb in tbs.keys():
1439            seg = topdl.Segment(
1440                    id = tbparams[tb]['allocID'],
1441                    type='emulab',
1442                    uri = self.tbmap.get(tb, None),
1443                    interface=[ 
1444                        topdl.Interface(
1445                            substrate=substr.name),
1446                        ],
1447                    attribute=[ topdl.Attribute(
1448                        attribute='dragon_endpoint', 
1449                        value=tbparams[tb]['dragon']),
1450                        ]
1451                    )
1452            if tbparams[tb].has_key('vlans'):
1453                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1454            segs.append(seg)
1455
1456        topo["dragon%d" %idx] = \
1457                topdl.Topology(substrates=[substr], elements=segs,
1458                        attribute=[
1459                            topdl.Attribute(attribute="transit", value='true'),
1460                            topdl.Attribute(attribute="dynamic", value='true'),
1461                            topdl.Attribute(attribute="testbed", value='dragon'),
1462                            ]
1463                        )
1464
1465    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid):
1466        """
1467        Add attribiutes to the various elements indicating that they are to be
1468        dragon connected and create a dragon segment in tops to be
1469        instantiated.
1470        """
1471
1472        def get_substrate_from_topo(name, t):
1473            for s in t.substrates:
1474                if s.name == name: return s
1475            else: return None
1476
1477        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1478        elements = [ i.element for i in sub.interfaces ]
1479        count = { }
1480        for e in elements:
1481            tb = e.get_attribute('testbed')
1482            count[tb] = count.get(tb, 0) + 1
1483
1484        for tb in tbs.keys():
1485            s = get_substrate_from_topo(sub.name, topo[tb])
1486            if s:
1487                for i in s.interfaces:
1488                    i.set_attribute('dragon_vlan', 'unassigned%d' % dn)
1489                    if count[tb] > 1: i.set_attribute('dragon_type', 'lan')
1490                    else: i.set_attribute('dragon_type', 'link')
1491            else:
1492                raise service_error(service_error.internal,
1493                        "No substrate %s in testbed %s" % (sub.name, tb))
1494
1495        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1496
1497    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1498            segment_substrate, portals):
1499        # More than one testbed is on this substrate.  Insert
1500        # some portals into the subtopologies.  st == source testbed,
1501        # dt == destination testbed.
1502        for st in tbs.keys():
1503            if not segment_substrate.has_key(st):
1504                segment_substrate[st] = { }
1505            if not portals.has_key(st): 
1506                portals[st] = { }
1507            for dt in [ t for t in tbs.keys() if t != st]:
1508                sproject = tbparams[st].get('project', 'project')
1509                dproject = tbparams[dt].get('project', 'project')
1510                mproject = tbparams[master].get('project', 'project')
1511                sdomain = tbparams[st].get('domain', ".example.com")
1512                ddomain = tbparams[dt].get('domain', ".example.com")
1513                mdomain = tbparams[master].get('domain', '.example.com')
1514                muser = tbparams[master].get('user', 'root')
1515                smbshare = tbparams[master].get('smbshare', 'USERS')
1516                aid = tbparams[dt]['allocID']['fedid']
1517                if st == master or dt == master:
1518                    active = ("%s" % (st == master))
1519                else:
1520                    active = ("%s" %(st > dt))
1521                if not segment_substrate[st].has_key(dt):
1522                    # Put a substrate and a segment for the connected
1523                    # testbed in there.
1524                    tsubstrate, segment_element = \
1525                            self.new_portal_substrate(st, dt, eid, tbparams)
1526                    segment_substrate[st][dt] = tsubstrate
1527                    topo[st].substrates.append(tsubstrate)
1528                    topo[st].elements.append(segment_element)
1529
1530                new_portal = False
1531                if portals[st].has_key(dt):
1532                    # There's a portal set up to go to this destination.
1533                    # See if there's room to multiples this connection on
1534                    # it.  If so, add an interface to the portal; if not,
1535                    # set up to add a portal below.
1536                    # [This little festival of braces is just a pop of the
1537                    # last element in the list of portals between st and
1538                    # dt.]
1539                    portal = portals[st][dt][-1]
1540                    mux = len([ i for i in portal.interface \
1541                            if not i.get_attribute('portal')])
1542                    if mux == self.muxmax:
1543                        new_portal = True
1544                        portal_type = "experiment"
1545                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1546                        desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1547                    else:
1548                        new_i = topdl.Interface(
1549                                substrate=sub.name,
1550                                attribute=[ 
1551                                    topdl.Attribute(
1552                                        attribute='ip4_address', 
1553                                        value=tbs[dt]
1554                                    )
1555                                ])
1556                        portal.interface.append(new_i)
1557                else:
1558                    # First connection to this testbed, make an empty list
1559                    # and set up to add the new portal below
1560                    new_portal = True
1561                    portals[st][dt] = [ ]
1562                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1563                    desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1564
1565                    if dt == master or st == master: portal_type = "both"
1566                    else: portal_type = "experiment"
1567
1568                if new_portal:
1569                    infs = (
1570                            (segment_substrate[st][dt].name, 
1571                                (('portal', 'true'),)),
1572                            (sub.name, 
1573                                (('ip4_address', tbs[dt]),))
1574                        )
1575                    portal =  self.new_portal_node(st, dt, tbparams, 
1576                            master, eid, myname, desthost, portal_type,
1577                            infs)
1578                    if self.fedkit:
1579                        self.add_kit(portal, self.fedkit)
1580                    if self.gatewaykit: 
1581                        self.add_kit(portal, self.gatewaykit)
1582
1583                    topo[st].elements.append(portal)
1584                    portals[st][dt].append(portal)
1585
1586    def add_control_portal(self, st, dt, master, eid, topo, tbparams):
1587        # Add to the master testbed
1588        tsubstrate, segment_element = \
1589                self.new_portal_substrate(st, dt, eid, tbparams)
1590        myname = "%stunnel" % dt
1591        desthost = "%stunnel" % st
1592
1593        portal = self.new_portal_node(st, dt, tbparams, master,
1594                eid, myname, desthost, "control", 
1595                ((tsubstrate.name,(('portal','true'),)),))
1596        if self.fedkit:
1597            self.add_kit(portal, self.fedkit)
1598        if self.gatewaykit: 
1599            self.add_kit(portal, self.gatewaykit)
1600
1601        topo[st].substrates.append(tsubstrate)
1602        topo[st].elements.append(segment_element)
1603        topo[st].elements.append(portal)
1604
1605    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1606            substrate, tbparams):
1607        # Add to the master testbed
1608        myname = "%stunnel" % dt
1609        desthost = "%s" % ip_addr(dip)
1610
1611        portal = self.new_portal_node(st, dt, tbparams, master,
1612                eid, myname, desthost, "control", 
1613                ((substrate.name,(
1614                    ('portal','true'),
1615                    ('ip4_address', "%s" % ip_addr(myip)),
1616                    ('dragon_vlan', 'unassigned%d' % idx),
1617                    ('dragon_type', 'link'),)),))
1618        if self.fedkit:
1619            self.add_kit(portal, self.fedkit)
1620        if self.gatewaykit: 
1621            self.add_kit(portal, self.gatewaykit)
1622
1623        return portal
1624
1625    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator):
1626        """
1627        For each substrate in the main topology, find those that
1628        have nodes on more than one testbed.  Insert portal nodes
1629        into the copies of those substrates on the sub topologies.
1630        """
1631        segment_substrate = { }
1632        portals = { }
1633        for s in top.substrates:
1634            # tbs will contain an ip address on this subsrate that is in
1635            # each testbed.
1636            tbs = { }
1637            for i in s.interfaces:
1638                e = i.element
1639                tb = e.get_attribute('testbed')
1640                if tb and not tbs.has_key(tb):
1641                    for i in e.interface:
1642                        if s in i.subs:
1643                            tbs[tb]= i.get_attribute('ip4_address')
1644            if len(tbs) < 2:
1645                continue
1646
1647            # DRAGON will not create multi-site vlans yet
1648            if len(tbs) == 2 and \
1649                    all([tbparams[x].has_key('dragon') for x in tbs]):
1650                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1651                        master, eid)
1652            else:
1653                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1654                        eid, segment_substrate, portals)
1655
1656        # Make sure that all the slaves have a control portal back to the
1657        # master.
1658        for tb in [ t for t in tbparams.keys() if t != master ]:
1659            if len([e for e in topo[tb].elements \
1660                    if isinstance(e, topdl.Computer) and \
1661                    e.get_attribute('portal') and \
1662                    e.get_attribute('portal_type') == 'both']) == 0:
1663
1664                if tbparams[master].has_key('dragon') \
1665                        and tbparams[tb].has_key('dragon'):
1666
1667                    idx = len([x for x in topo.keys() \
1668                            if x.startswith('dragon')])
1669                    dip, leng = ip_allocator.allocate(4)
1670                    dip += 1
1671                    mip = dip+1
1672                    csub = topdl.Substrate(
1673                            name="dragon-control-%s" % tb,
1674                            capacity=topdl.Capacity(100000.0, 'max'),
1675                            attribute=[
1676                                topdl.Attribute(
1677                                    attribute='portal',
1678                                    value='true'
1679                                    )
1680                                ]
1681                            )
1682                    seg = topdl.Segment(
1683                            id= tbparams[master]['allocID'],
1684                            type='emulab',
1685                            uri = self.tbmap.get(master, None),
1686                            interface=[ 
1687                                topdl.Interface(
1688                                    substrate=csub.name),
1689                                ],
1690                            attribute = [
1691                                topdl.Attribute(attribute=n, value=v)
1692                                    for n, v in (\
1693                                        ('domain', 
1694                                            tbparams[master].get('domain',
1695                                                ".example.com")),
1696                                        ('experiment', "%s/%s" % \
1697                                                (tbparams[master].get(
1698                                                    'project', 
1699                                                    'project'), 
1700                                                    eid)),)
1701                                ],
1702                            )
1703                    topo[tb].substrates.append(csub)
1704                    topo[tb].elements.append(
1705                            self.new_dragon_portal(tb, master, master, eid, 
1706                                dip, mip, idx, csub, tbparams))
1707                    topo[tb].elements.append(seg)
1708
1709                    mcsub = csub.clone()
1710                    seg = topdl.Segment(
1711                            id= tbparams[tb]['allocID'],
1712                            type='emulab',
1713                            uri = self.tbmap.get(tb, None),
1714                            interface=[ 
1715                                topdl.Interface(
1716                                    substrate=csub.name),
1717                                ],
1718                            attribute = [
1719                                topdl.Attribute(attribute=n, value=v)
1720                                    for n, v in (\
1721                                        ('domain', 
1722                                            tbparams[tb].get('domain',
1723                                                ".example.com")),
1724                                        ('experiment', "%s/%s" % \
1725                                                (tbparams[tb].get('project', 
1726                                                    'project'), 
1727                                                    eid)),)
1728                                ],
1729                            )
1730                    topo[master].substrates.append(mcsub)
1731                    topo[master].elements.append(
1732                            self.new_dragon_portal(master, tb, master, eid, 
1733                                mip, dip, idx, mcsub, tbparams))
1734                    topo[master].elements.append(seg)
1735
1736                    self.create_dragon_substrate(csub, topo, 
1737                            {tb: 1, master:1}, tbparams, master, eid)
1738                else:
1739                    self.add_control_portal(master, tb, master, eid, topo, 
1740                            tbparams)
1741                    self.add_control_portal(tb, master, master, eid, topo, 
1742                            tbparams)
1743
1744        # Connect the portal nodes into the topologies and clear out
1745        # substrates that are not in the topologies
1746        for tb in tbparams.keys():
1747            topo[tb].incorporate_elements()
1748            topo[tb].substrates = \
1749                    [s for s in topo[tb].substrates \
1750                        if len(s.interfaces) >0]
1751
1752    def wrangle_software(self, expid, top, topo, tbparams):
1753        """
1754        Copy software out to the repository directory, allocate permissions and
1755        rewrite the segment topologies to look for the software in local
1756        places.
1757        """
1758
1759        # Copy the rpms and tarfiles to a distribution directory from
1760        # which the federants can retrieve them
1761        linkpath = "%s/software" %  expid
1762        softdir ="%s/%s" % ( self.repodir, linkpath)
1763        softmap = { }
1764        # These are in a list of tuples format (each kit).  This comprehension
1765        # unwraps them into a single list of tuples that initilaizes the set of
1766        # tuples.
1767        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1768                for p, t in l ])
1769        pkgs.update([x.location for e in top.elements \
1770                for x in e.software])
1771        try:
1772            os.makedirs(softdir)
1773        except IOError, e:
1774            raise service_error(
1775                    "Cannot create software directory: %s" % e)
1776        # The actual copying.  Everything's converted into a url for copying.
1777        for pkg in pkgs:
1778            loc = pkg
1779
1780            scheme, host, path = urlparse(loc)[0:3]
1781            dest = os.path.basename(path)
1782            if not scheme:
1783                if not loc.startswith('/'):
1784                    loc = "/%s" % loc
1785                loc = "file://%s" %loc
1786            try:
1787                u = urlopen(loc)
1788            except Exception, e:
1789                raise service_error(service_error.req, 
1790                        "Cannot open %s: %s" % (loc, e))
1791            try:
1792                f = open("%s/%s" % (softdir, dest) , "w")
1793                self.log.debug("Writing %s/%s" % (softdir,dest) )
1794                data = u.read(4096)
1795                while data:
1796                    f.write(data)
1797                    data = u.read(4096)
1798                f.close()
1799                u.close()
1800            except Exception, e:
1801                raise service_error(service_error.internal,
1802                        "Could not copy %s: %s" % (loc, e))
1803            path = re.sub("/tmp", "", linkpath)
1804            # XXX
1805            softmap[pkg] = \
1806                    "%s/%s/%s" %\
1807                    ( self.repo_url, path, dest)
1808
1809            # Allow the individual segments to access the software.
1810            for tb in tbparams.keys():
1811                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1812                        "/%s/%s" % ( path, dest))
1813
1814        # Convert the software locations in the segments into the local
1815        # copies on this host
1816        for soft in [ s for tb in topo.values() \
1817                for e in tb.elements \
1818                    if getattr(e, 'software', False) \
1819                        for s in e.software ]:
1820            if softmap.has_key(soft.location):
1821                soft.location = softmap[soft.location]
1822
1823
1824    def new_experiment(self, req, fid):
1825        """
1826        The external interface to empty initial experiment creation called from
1827        the dispatcher.
1828
1829        Creates a working directory, splits the incoming description using the
1830        splitter script and parses out the avrious subsections using the
1831        lcasses above.  Once each sub-experiment is created, use pooled threads
1832        to instantiate them and start it all up.
1833        """
1834        if not self.auth.check_attribute(fid, 'new'):
1835            raise service_error(service_error.access, "New access denied")
1836
1837        try:
1838            tmpdir = tempfile.mkdtemp(prefix="split-")
1839        except IOError:
1840            raise service_error(service_error.internal, "Cannot create tmp dir")
1841
1842        try:
1843            access_user = self.accessdb[fid]
1844        except KeyError:
1845            raise service_error(service_error.internal,
1846                    "Access map and authorizer out of sync in " + \
1847                            "new_experiment for fedid %s"  % fid)
1848
1849        pid = "dummy"
1850        gid = "dummy"
1851
1852        req = req.get('NewRequestBody', None)
1853        if not req:
1854            raise service_error(service_error.req,
1855                    "Bad request format (no NewRequestBody)")
1856
1857        # Generate an ID for the experiment (slice) and a certificate that the
1858        # allocator can use to prove they own it.  We'll ship it back through
1859        # the encrypted connection.
1860        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1861
1862        #now we're done with the tmpdir, and it should be empty
1863        if self.cleanup:
1864            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1865            os.rmdir(tmpdir)
1866        else:
1867            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1868
1869        eid = self.create_experiment_state(fid, req, expid, expcert, 
1870                state='empty')
1871
1872        # Let users touch the state
1873        self.auth.set_attribute(fid, expid)
1874        self.auth.set_attribute(expid, expid)
1875        # Override fedids can manipulate state as well
1876        for o in self.overrides:
1877            self.auth.set_attribute(o, expid)
1878
1879        rv = {
1880                'experimentID': [
1881                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1882                ],
1883                'experimentStatus': 'empty',
1884                'experimentAccess': { 'X509' : expcert }
1885            }
1886
1887        return rv
1888
1889
1890    def create_experiment(self, req, fid):
1891        """
1892        The external interface to experiment creation called from the
1893        dispatcher.
1894
1895        Creates a working directory, splits the incoming description using the
1896        splitter script and parses out the avrious subsections using the
1897        lcasses above.  Once each sub-experiment is created, use pooled threads
1898        to instantiate them and start it all up.
1899        """
1900
1901        req = req.get('CreateRequestBody', None)
1902        if not req:
1903            raise service_error(service_error.req,
1904                    "Bad request format (no CreateRequestBody)")
1905
1906        # Get the experiment access
1907        exp = req.get('experimentID', None)
1908        if exp:
1909            if exp.has_key('fedid'):
1910                key = exp['fedid']
1911                expid = key
1912                eid = None
1913            elif exp.has_key('localname'):
1914                key = exp['localname']
1915                eid = key
1916                expid = None
1917            else:
1918                raise service_error(service_error.req, "Unknown lookup type")
1919        else:
1920            raise service_error(service_error.req, "No request?")
1921
1922        self.check_experiment_access(fid, key)
1923
1924        try:
1925            tmpdir = tempfile.mkdtemp(prefix="split-")
1926            os.mkdir(tmpdir+"/keys")
1927        except IOError:
1928            raise service_error(service_error.internal, "Cannot create tmp dir")
1929
1930        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1931        gw_secretkey_base = "fed.%s" % self.ssh_type
1932        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1933        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1934        tclfile = tmpdir + "/experiment.tcl"
1935        tbparams = { }
1936        try:
1937            access_user = self.accessdb[fid]
1938        except KeyError:
1939            raise service_error(service_error.internal,
1940                    "Access map and authorizer out of sync in " + \
1941                            "create_experiment for fedid %s"  % fid)
1942
1943        pid = "dummy"
1944        gid = "dummy"
1945
1946        # The tcl parser needs to read a file so put the content into that file
1947        descr=req.get('experimentdescription', None)
1948        if descr:
1949            file_content=descr.get('ns2description', None)
1950            if file_content:
1951                try:
1952                    f = open(tclfile, 'w')
1953                    f.write(file_content)
1954                    f.close()
1955                except IOError:
1956                    raise service_error(service_error.internal,
1957                            "Cannot write temp experiment description")
1958            else:
1959                raise service_error(service_error.req, 
1960                        "Only ns2descriptions supported")
1961        else:
1962            raise service_error(service_error.req, "No experiment description")
1963
1964        self.state_lock.acquire()
1965        if self.state.has_key(key):
1966            self.state[key]['experimentStatus'] = "starting"
1967            for e in self.state[key].get('experimentID',[]):
1968                if not expid and e.has_key('fedid'):
1969                    expid = e['fedid']
1970                elif not eid and e.has_key('localname'):
1971                    eid = e['localname']
1972        self.state_lock.release()
1973
1974        if not (eid and expid):
1975            raise service_error(service_error.internal, 
1976                    "Cannot find local experiment info!?")
1977
1978        try: 
1979            # This catches exceptions to clear the placeholder if necessary
1980            try:
1981                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1982            except ValueError:
1983                raise service_error(service_error.server_config, 
1984                        "Bad key type (%s)" % self.ssh_type)
1985
1986            master = req.get('master', None)
1987            if not master:
1988                raise service_error(service_error.req,
1989                        "No master testbed label")
1990            export_project = req.get('exportProject', None)
1991            if not export_project:
1992                raise service_error(service_error.req, "No export project")
1993           
1994            # Translate to topdl
1995            if self.splitter_url:
1996                # XXX: need remote topdl translator
1997                self.log.debug("Calling remote splitter at %s" % \
1998                        self.splitter_url)
1999                split_data = self.remote_splitter(self.splitter_url,
2000                        file_content, master)
2001            else:
2002                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2003                    str(self.muxmax), '-m', master]
2004
2005                if self.fedkit:
2006                    tclcmd.append('-k')
2007
2008                if self.gatewaykit:
2009                    tclcmd.append('-K')
2010
2011                tclcmd.extend([pid, gid, eid, tclfile])
2012
2013                self.log.debug("running local splitter %s", " ".join(tclcmd))
2014                # This is just fantastic.  As a side effect the parser copies
2015                # tb_compat.tcl into the current directory, so that directory
2016                # must be writable by the fedd user.  Doing this in the
2017                # temporary subdir ensures this is the case.
2018                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2019                        cwd=tmpdir)
2020                split_data = tclparser.stdout
2021
2022            top = topdl.topology_from_xml(file=split_data, top="experiment")
2023
2024            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2025             # Find the testbeds to look up
2026            testbeds = set([ a.value for e in top.elements \
2027                    for a in e.attribute \
2028                    if a.attribute == 'testbed'] )
2029
2030            allocated = { }         # Testbeds we can access
2031            topo ={ }               # Sub topologies
2032            self.get_access_to_testbeds(testbeds, access_user, 
2033                    export_project, master, allocated, tbparams)
2034            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2035
2036            # Copy configuration files into the remote file store
2037            # The config urlpath
2038            configpath = "/%s/config" % expid
2039            # The config file system location
2040            configdir ="%s%s" % ( self.repodir, configpath)
2041            try:
2042                os.makedirs(configdir)
2043            except IOError, e:
2044                raise service_error(
2045                        "Cannot create config directory: %s" % e)
2046            try:
2047                f = open("%s/hosts" % configdir, "w")
2048                f.write('\n'.join(hosts))
2049                f.close()
2050            except IOError, e:
2051                raise service_error(service_error.internal, 
2052                        "Cannot write hosts file: %s" % e)
2053            try:
2054                copy_file("%s" % gw_pubkey, "%s/%s" % \
2055                        (configdir, gw_pubkey_base))
2056                copy_file("%s" % gw_secretkey, "%s/%s" % \
2057                        (configdir, gw_secretkey_base))
2058            except IOError, e:
2059                raise service_error(service_error.internal, 
2060                        "Cannot copy keyfiles: %s" % e)
2061
2062            # Allow the individual testbeds to access the configuration files.
2063            for tb in tbparams.keys():
2064                asignee = tbparams[tb]['allocID']['fedid']
2065                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2066                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2067
2068            self.add_portals(top, topo, eid, master, tbparams, ip_allocator)
2069            # Now get access to the dynamic testbeds
2070            for k, t in topo.items():
2071                if not t.get_attribute('dynamic'):
2072                    continue
2073                tb = t.get_attribute('testbed')
2074                if tb: 
2075                    self.get_access(tb, None, user, tbparams, master, 
2076                            export_project, access_user)
2077                    tbparams[k] = tbparams[tb]
2078                    del tbparams[tb]
2079                    allocated[k] = 1
2080                else:
2081                    raise service_error(service_error.internal, 
2082                            "Dynamic allocation from no testbed!?")
2083
2084            self.wrangle_software(expid, top, topo, tbparams)
2085
2086            vtopo = topdl.topology_to_vtopo(top)
2087            vis = self.genviz(vtopo)
2088
2089            # save federant information
2090            for k in allocated.keys():
2091                tbparams[k]['federant'] = {
2092                        'name': [ { 'localname' : eid} ],
2093                        'allocID' : tbparams[k]['allocID'],
2094                        'master' : k == master,
2095                        'uri': tbparams[k]['uri'],
2096                    }
2097                if tbparams[k].has_key('emulab'):
2098                        tbparams[k]['federant']['emulab'] = \
2099                                tbparams[k]['emulab']
2100
2101            self.state_lock.acquire()
2102            self.state[eid]['vtopo'] = vtopo
2103            self.state[eid]['vis'] = vis
2104            self.state[expid]['federant'] = \
2105                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2106                        if tbparams[tb].has_key('federant') ]
2107            if self.state_filename: 
2108                self.write_state()
2109            self.state_lock.release()
2110        except service_error, e:
2111            # If something goes wrong in the parse (usually an access error)
2112            # clear the placeholder state.  From here on out the code delays
2113            # exceptions.  Failing at this point returns a fault to the remote
2114            # caller.
2115
2116            self.state_lock.acquire()
2117            del self.state[eid]
2118            del self.state[expid]
2119            if self.state_filename: self.write_state()
2120            self.state_lock.release()
2121            raise e
2122
2123
2124        # Start the background swapper and return the starting state.  From
2125        # here on out, the state will stick around a while.
2126
2127        # Let users touch the state
2128        self.auth.set_attribute(fid, expid)
2129        self.auth.set_attribute(expid, expid)
2130        # Override fedids can manipulate state as well
2131        for o in self.overrides:
2132            self.auth.set_attribute(o, expid)
2133
2134        # Create a logger that logs to the experiment's state object as well as
2135        # to the main log file.
2136        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2137        alloc_collector = self.list_log(self.state[eid]['log'])
2138        h = logging.StreamHandler(alloc_collector)
2139        # XXX: there should be a global one of these rather than repeating the
2140        # code.
2141        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2142                    '%d %b %y %H:%M:%S'))
2143        alloc_log.addHandler(h)
2144       
2145        attrs = [ 
2146                {
2147                    'attribute': 'ssh_pubkey', 
2148                    'value': '%s/%s/config/%s' % \
2149                            (self.repo_url, expid, gw_pubkey_base)
2150                },
2151                {
2152                    'attribute': 'ssh_secretkey', 
2153                    'value': '%s/%s/config/%s' % \
2154                            (self.repo_url, expid, gw_secretkey_base)
2155                },
2156                {
2157                    'attribute': 'hosts', 
2158                    'value': '%s/%s/config/hosts' % \
2159                            (self.repo_url, expid)
2160                },
2161                {
2162                    'attribute': 'experiment_name',
2163                    'value': eid,
2164                },
2165            ]
2166
2167        # Start a thread to do the resource allocation
2168        t  = Thread(target=self.allocate_resources,
2169                args=(allocated, master, eid, expid, tbparams, 
2170                    topo, tmpdir, alloc_log, alloc_collector, attrs),
2171                name=eid)
2172        t.start()
2173
2174        rv = {
2175                'experimentID': [
2176                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2177                ],
2178                'experimentStatus': 'starting',
2179            }
2180
2181        return rv
2182   
2183    def get_experiment_fedid(self, key):
2184        """
2185        find the fedid associated with the localname key in the state database.
2186        """
2187
2188        rv = None
2189        self.state_lock.acquire()
2190        if self.state.has_key(key):
2191            if isinstance(self.state[key], dict):
2192                try:
2193                    kl = [ f['fedid'] for f in \
2194                            self.state[key]['experimentID']\
2195                                if f.has_key('fedid') ]
2196                except KeyError:
2197                    self.state_lock.release()
2198                    raise service_error(service_error.internal, 
2199                            "No fedid for experiment %s when getting "+\
2200                                    "fedid(!?)" % key)
2201                if len(kl) == 1:
2202                    rv = kl[0]
2203                else:
2204                    self.state_lock.release()
2205                    raise service_error(service_error.internal, 
2206                            "multiple fedids for experiment %s when " +\
2207                                    "getting fedid(!?)" % key)
2208            else:
2209                self.state_lock.release()
2210                raise service_error(service_error.internal, 
2211                        "Unexpected state for %s" % key)
2212        self.state_lock.release()
2213        return rv
2214
2215    def check_experiment_access(self, fid, key):
2216        """
2217        Confirm that the fid has access to the experiment.  Though a request
2218        may be made in terms of a local name, the access attribute is always
2219        the experiment's fedid.
2220        """
2221        if not isinstance(key, fedid):
2222            key = self.get_experiment_fedid(key)
2223
2224        if self.auth.check_attribute(fid, key):
2225            return True
2226        else:
2227            raise service_error(service_error.access, "Access Denied")
2228
2229
2230    def get_handler(self, path, fid):
2231        self.log.info("Get handler %s %s" % (path, fid))
2232        if self.auth.check_attribute(fid, path):
2233            return ("%s/%s" % (self.repodir, path), "application/binary")
2234        else:
2235            return (None, None)
2236
2237    def get_vtopo(self, req, fid):
2238        """
2239        Return the stored virtual topology for this experiment
2240        """
2241        rv = None
2242        state = None
2243
2244        req = req.get('VtopoRequestBody', None)
2245        if not req:
2246            raise service_error(service_error.req,
2247                    "Bad request format (no VtopoRequestBody)")
2248        exp = req.get('experiment', None)
2249        if exp:
2250            if exp.has_key('fedid'):
2251                key = exp['fedid']
2252                keytype = "fedid"
2253            elif exp.has_key('localname'):
2254                key = exp['localname']
2255                keytype = "localname"
2256            else:
2257                raise service_error(service_error.req, "Unknown lookup type")
2258        else:
2259            raise service_error(service_error.req, "No request?")
2260
2261        self.check_experiment_access(fid, key)
2262
2263        self.state_lock.acquire()
2264        if self.state.has_key(key):
2265            if self.state[key].has_key('vtopo'):
2266                rv = { 'experiment' : {keytype: key },\
2267                        'vtopo': self.state[key]['vtopo'],\
2268                    }
2269            else:
2270                state = self.state[key]['experimentStatus']
2271        self.state_lock.release()
2272
2273        if rv: return rv
2274        else: 
2275            if state:
2276                raise service_error(service_error.partial, 
2277                        "Not ready: %s" % state)
2278            else:
2279                raise service_error(service_error.req, "No such experiment")
2280
2281    def get_vis(self, req, fid):
2282        """
2283        Return the stored visualization for this experiment
2284        """
2285        rv = None
2286        state = None
2287
2288        req = req.get('VisRequestBody', None)
2289        if not req:
2290            raise service_error(service_error.req,
2291                    "Bad request format (no VisRequestBody)")
2292        exp = req.get('experiment', None)
2293        if exp:
2294            if exp.has_key('fedid'):
2295                key = exp['fedid']
2296                keytype = "fedid"
2297            elif exp.has_key('localname'):
2298                key = exp['localname']
2299                keytype = "localname"
2300            else:
2301                raise service_error(service_error.req, "Unknown lookup type")
2302        else:
2303            raise service_error(service_error.req, "No request?")
2304
2305        self.check_experiment_access(fid, key)
2306
2307        self.state_lock.acquire()
2308        if self.state.has_key(key):
2309            if self.state[key].has_key('vis'):
2310                rv =  { 'experiment' : {keytype: key },\
2311                        'vis': self.state[key]['vis'],\
2312                        }
2313            else:
2314                state = self.state[key]['experimentStatus']
2315        self.state_lock.release()
2316
2317        if rv: return rv
2318        else:
2319            if state:
2320                raise service_error(service_error.partial, 
2321                        "Not ready: %s" % state)
2322            else:
2323                raise service_error(service_error.req, "No such experiment")
2324
2325    def clean_info_response(self, rv):
2326        """
2327        Remove the information in the experiment's state object that is not in
2328        the info response.
2329        """
2330        # Remove the owner info (should always be there, but...)
2331        if rv.has_key('owner'): del rv['owner']
2332
2333        # Convert the log into the allocationLog parameter and remove the
2334        # log entry (with defensive programming)
2335        if rv.has_key('log'):
2336            rv['allocationLog'] = "".join(rv['log'])
2337            del rv['log']
2338        else:
2339            rv['allocationLog'] = ""
2340
2341        if rv['experimentStatus'] != 'active':
2342            if rv.has_key('federant'): del rv['federant']
2343        else:
2344            # remove the allocationID and uri info from each federant
2345            for f in rv.get('federant', []):
2346                if f.has_key('allocID'): del f['allocID']
2347                if f.has_key('uri'): del f['uri']
2348        return rv
2349
2350    def get_info(self, req, fid):
2351        """
2352        Return all the stored info about this experiment
2353        """
2354        rv = None
2355
2356        req = req.get('InfoRequestBody', None)
2357        if not req:
2358            raise service_error(service_error.req,
2359                    "Bad request format (no InfoRequestBody)")
2360        exp = req.get('experiment', None)
2361        if exp:
2362            if exp.has_key('fedid'):
2363                key = exp['fedid']
2364                keytype = "fedid"
2365            elif exp.has_key('localname'):
2366                key = exp['localname']
2367                keytype = "localname"
2368            else:
2369                raise service_error(service_error.req, "Unknown lookup type")
2370        else:
2371            raise service_error(service_error.req, "No request?")
2372
2373        self.check_experiment_access(fid, key)
2374
2375        # The state may be massaged by the service function that called
2376        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2377        # state.
2378        self.state_lock.acquire()
2379        if self.state.has_key(key):
2380            rv = copy.deepcopy(self.state[key])
2381        self.state_lock.release()
2382
2383        if rv:
2384            return self.clean_info_response(rv)
2385        else:
2386            raise service_error(service_error.req, "No such experiment")
2387
2388    def get_multi_info(self, req, fid):
2389        """
2390        Return all the stored info that this fedid can access
2391        """
2392        rv = { 'info': [ ] }
2393
2394        self.state_lock.acquire()
2395        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2396            try:
2397                self.check_experiment_access(fid, key)
2398            except service_error, e:
2399                if e.code == service_error.access:
2400                    continue
2401                else:
2402                    self.state_lock.release()
2403                    raise e
2404
2405            if self.state.has_key(key):
2406                e = copy.deepcopy(self.state[key])
2407                e = self.clean_info_response(e)
2408                rv['info'].append(e)
2409        self.state_lock.release()
2410        return rv
2411
2412    def terminate_experiment(self, req, fid):
2413        """
2414        Swap this experiment out on the federants and delete the shared
2415        information
2416        """
2417        tbparams = { }
2418        req = req.get('TerminateRequestBody', None)
2419        if not req:
2420            raise service_error(service_error.req,
2421                    "Bad request format (no TerminateRequestBody)")
2422        force = req.get('force', False)
2423        exp = req.get('experiment', None)
2424        if exp:
2425            if exp.has_key('fedid'):
2426                key = exp['fedid']
2427                keytype = "fedid"
2428            elif exp.has_key('localname'):
2429                key = exp['localname']
2430                keytype = "localname"
2431            else:
2432                raise service_error(service_error.req, "Unknown lookup type")
2433        else:
2434            raise service_error(service_error.req, "No request?")
2435
2436        self.check_experiment_access(fid, key)
2437
2438        dealloc_list = [ ]
2439
2440
2441        # Create a logger that logs to the dealloc_list as well as to the main
2442        # log file.
2443        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2444        h = logging.StreamHandler(self.list_log(dealloc_list))
2445        # XXX: there should be a global one of these rather than repeating the
2446        # code.
2447        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2448                    '%d %b %y %H:%M:%S'))
2449        dealloc_log.addHandler(h)
2450
2451        self.state_lock.acquire()
2452        fed_exp = self.state.get(key, None)
2453
2454        if fed_exp:
2455            # This branch of the conditional holds the lock to generate a
2456            # consistent temporary tbparams variable to deallocate experiments.
2457            # It releases the lock to do the deallocations and reacquires it to
2458            # remove the experiment state when the termination is complete.
2459
2460            # First make sure that the experiment creation is complete.
2461            status = fed_exp.get('experimentStatus', None)
2462
2463            if status:
2464                if status in ('starting', 'terminating'):
2465                    if not force:
2466                        self.state_lock.release()
2467                        raise service_error(service_error.partial, 
2468                                'Experiment still being created or destroyed')
2469                    else:
2470                        self.log.warning('Experiment in %s state ' % status + \
2471                                'being terminated by force.')
2472            else:
2473                # No status??? trouble
2474                self.state_lock.release()
2475                raise service_error(service_error.internal,
2476                        "Experiment has no status!?")
2477
2478            ids = []
2479            #  experimentID is a list of dicts that are self-describing
2480            #  identifiers.  This finds all the fedids and localnames - the
2481            #  keys of self.state - and puts them into ids.
2482            for id in fed_exp.get('experimentID', []):
2483                if id.has_key('fedid'): ids.append(id['fedid'])
2484                if id.has_key('localname'): ids.append(id['localname'])
2485
2486            # Collect the allocation/segment ids into a dict keyed by the fedid
2487            # of the allocation (or a monotonically increasing integer) that
2488            # contains a tuple of uri, aid (which is a dict...)
2489            for i, fed in enumerate(fed_exp.get('federant', [])):
2490                try:
2491                    uri = fed['uri']
2492                    aid = fed['allocID']
2493                    k = fed['allocID'].get('fedid', i)
2494                except KeyError, e:
2495                    continue
2496                tbparams[k] = (uri, aid)
2497            fed_exp['experimentStatus'] = 'terminating'
2498            if self.state_filename: self.write_state()
2499            self.state_lock.release()
2500
2501            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2502            # then completes, so we can't wait if nothing starts.  So, no
2503            # tbparams, no start.
2504            if len(tbparams) > 0:
2505                thread_pool = self.thread_pool(self.nthreads)
2506                for k in tbparams.keys():
2507                    # Create and start a thread to stop the segment
2508                    thread_pool.wait_for_slot()
2509                    uri, aid = tbparams[k]
2510                    t  = self.pooled_thread(\
2511                            target=self.terminate_segment(log=dealloc_log,
2512                                testbed=uri,
2513                                cert_file=self.cert_file, 
2514                                cert_pwd=self.cert_pwd,
2515                                trusted_certs=self.trusted_certs,
2516                                caller=self.call_TerminateSegment),
2517                            args=(uri, aid), name=k,
2518                            pdata=thread_pool, trace_file=self.trace_file)
2519                    t.start()
2520                # Wait for completions
2521                thread_pool.wait_for_all_done()
2522
2523            # release the allocations (failed experiments have done this
2524            # already, and starting experiments may be in odd states, so we
2525            # ignore errors releasing those allocations
2526            try: 
2527                for k in tbparams.keys():
2528                    # This releases access by uri
2529                    uri, aid = tbparams[k]
2530                    self.release_access(None, aid, uri=uri)
2531            except service_error, e:
2532                if status != 'failed' and not force:
2533                    raise e
2534
2535            # Remove the terminated experiment
2536            self.state_lock.acquire()
2537            for id in ids:
2538                if self.state.has_key(id): del self.state[id]
2539
2540            if self.state_filename: self.write_state()
2541            self.state_lock.release()
2542
2543            return { 
2544                    'experiment': exp , 
2545                    'deallocationLog': "".join(dealloc_list),
2546                    }
2547        else:
2548            # Don't forget to release the lock
2549            self.state_lock.release()
2550            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.