source: fedd/federation/experiment_control.py @ 617592b

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 617592b was 617592b, checked in by Ted Faber <faber@…>, 15 years ago

More dragon cleanup. Config files look right, but need to be tested.

  • Property mode set to 100644
File size: 88.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221        self.repo_url = config.get("experiment_control", "repo_url", 
222                "https://users.isi.deterlab.net:23235");
223
224        self.exp_stem = "fed-stem"
225        self.log = logging.getLogger("fedd.experiment_control")
226        set_log_level(config, "experiment_control", self.log)
227        self.muxmax = 2
228        self.nthreads = 2
229        self.randomize_experiments = False
230
231        self.splitter = None
232        self.ssh_keygen = "/usr/bin/ssh-keygen"
233        self.ssh_identity_file = None
234
235
236        self.debug = config.getboolean("experiment_control", "create_debug")
237        self.cleanup = not config.getboolean("experiment_control", 
238                "leave_tmpfiles")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'New': soap_handler('New', self.new_experiment),
337                'Create': soap_handler('Create', self.create_experiment),
338                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
339                'Vis': soap_handler('Vis', self.get_vis),
340                'Info': soap_handler('Info', self.get_info),
341                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
342                'Terminate': soap_handler('Terminate', 
343                    self.terminate_experiment),
344        }
345
346        self.xmlrpc_services = {\
347                'New': xmlrpc_handler('New', self.new_experiment),
348                'Create': xmlrpc_handler('Create', self.create_experiment),
349                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
350                'Vis': xmlrpc_handler('Vis', self.get_vis),
351                'Info': xmlrpc_handler('Info', self.get_info),
352                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
353                'Terminate': xmlrpc_handler('Terminate',
354                    self.terminate_experiment),
355        }
356
357    # Call while holding self.state_lock
358    def write_state(self):
359        """
360        Write a new copy of experiment state after copying the existing state
361        to a backup.
362
363        State format is a simple pickling of the state dictionary.
364        """
365        if os.access(self.state_filename, os.W_OK):
366            copy_file(self.state_filename, \
367                    "%s.bak" % self.state_filename)
368        try:
369            f = open(self.state_filename, 'w')
370            pickle.dump(self.state, f)
371        except IOError, e:
372            self.log.error("Can't write file %s: %s" % \
373                    (self.state_filename, e))
374        except pickle.PicklingError, e:
375            self.log.error("Pickling problem: %s" % e)
376        except TypeError, e:
377            self.log.error("Pickling problem (TypeError): %s" % e)
378
379    # Call while holding self.state_lock
380    def read_state(self):
381        """
382        Read a new copy of experiment state.  Old state is overwritten.
383
384        State format is a simple pickling of the state dictionary.
385        """
386       
387        def get_experiment_id(state):
388            """
389            Pull the fedid experimentID out of the saved state.  This is kind
390            of a gross walk through the dict.
391            """
392
393            if state.has_key('experimentID'):
394                for e in state['experimentID']:
395                    if e.has_key('fedid'):
396                        return e['fedid']
397                else:
398                    return None
399            else:
400                return None
401
402        def get_alloc_ids(state):
403            """
404            Pull the fedids of the identifiers of each allocation from the
405            state.  Again, a dict dive that's best isolated.
406            """
407
408            return [ f['allocID']['fedid'] 
409                    for f in state.get('federant',[]) \
410                        if f.has_key('allocID') and \
411                            f['allocID'].has_key('fedid')]
412
413
414        try:
415            f = open(self.state_filename, "r")
416            self.state = pickle.load(f)
417            self.log.debug("[read_state]: Read state from %s" % \
418                    self.state_filename)
419        except IOError, e:
420            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
421                    % (self.state_filename, e))
422        except pickle.UnpicklingError, e:
423            self.log.warning(("[read_state]: No saved state: " + \
424                    "Unpickling failed: %s") % e)
425       
426        for s in self.state.values():
427            try:
428
429                eid = get_experiment_id(s)
430                if eid : 
431                    # Give the owner rights to the experiment
432                    self.auth.set_attribute(s['owner'], eid)
433                    # And holders of the eid as well
434                    self.auth.set_attribute(eid, eid)
435                    # allow overrides to control experiments as well
436                    for o in self.overrides:
437                        self.auth.set_attribute(o, eid)
438                    # Set permissions to allow reading of the software repo, if
439                    # any, as well.
440                    for a in get_alloc_ids(s):
441                        self.auth.set_attribute(a, 'repo/%s' % eid)
442                else:
443                    raise KeyError("No experiment id")
444            except KeyError, e:
445                self.log.warning("[read_state]: State ownership or identity " +\
446                        "misformatted in %s: %s" % (self.state_filename, e))
447
448
449    def read_accessdb(self, accessdb_file):
450        """
451        Read the mapping from fedids that can create experiments to their name
452        in the 3-level access namespace.  All will be asserted from this
453        testbed and can include the local username and porject that will be
454        asserted on their behalf by this fedd.  Each fedid is also added to the
455        authorization system with the "create" attribute.
456        """
457        self.accessdb = {}
458        # These are the regexps for parsing the db
459        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
460        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
461                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
462        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
463                "\s*->\s*(" + name_expr + ")\s*$")
464        lineno = 0
465
466        # Parse the mappings and store in self.authdb, a dict of
467        # fedid -> (proj, user)
468        try:
469            f = open(accessdb_file, "r")
470            for line in f:
471                lineno += 1
472                line = line.strip()
473                if len(line) == 0 or line.startswith('#'):
474                    continue
475                m = project_line.match(line)
476                if m:
477                    fid = fedid(hexstr=m.group(1))
478                    project, user = m.group(2,3)
479                    if not self.accessdb.has_key(fid):
480                        self.accessdb[fid] = []
481                    self.accessdb[fid].append((project, user))
482                    continue
483
484                m = user_line.match(line)
485                if m:
486                    fid = fedid(hexstr=m.group(1))
487                    project = None
488                    user = m.group(2)
489                    if not self.accessdb.has_key(fid):
490                        self.accessdb[fid] = []
491                    self.accessdb[fid].append((project, user))
492                    continue
493                self.log.warn("[experiment_control] Error parsing access " +\
494                        "db %s at line %d" %  (accessdb_file, lineno))
495        except IOError:
496            raise service_error(service_error.internal,
497                    "Error opening/reading %s as experiment " +\
498                            "control accessdb" %  accessdb_file)
499        f.close()
500
501        # Initialize the authorization attributes
502        for fid in self.accessdb.keys():
503            self.auth.set_attribute(fid, 'create')
504            self.auth.set_attribute(fid, 'new')
505
506    def read_mapdb(self, file):
507        """
508        Read a simple colon separated list of mappings for the
509        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
510        """
511
512        self.tbmap = { }
513        lineno =0
514        try:
515            f = open(file, "r")
516            for line in f:
517                lineno += 1
518                line = line.strip()
519                if line.startswith('#') or len(line) == 0:
520                    continue
521                try:
522                    label, url = line.split(':', 1)
523                    self.tbmap[label] = url
524                except ValueError, e:
525                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
526                            "map db: %s %s" % (lineno, line, e))
527        except IOError, e:
528            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
529                    "open %s: %s" % (file, e))
530        f.close()
531       
532    def generate_ssh_keys(self, dest, type="rsa" ):
533        """
534        Generate a set of keys for the gateways to use to talk.
535
536        Keys are of type type and are stored in the required dest file.
537        """
538        valid_types = ("rsa", "dsa")
539        t = type.lower();
540        if t not in valid_types: raise ValueError
541        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
542
543        try:
544            trace = open("/dev/null", "w")
545        except IOError:
546            raise service_error(service_error.internal,
547                    "Cannot open /dev/null??");
548
549        # May raise CalledProcessError
550        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
551        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
552        if rv != 0:
553            raise service_error(service_error.internal, 
554                    "Cannot generate nonce ssh keys.  %s return code %d" \
555                            % (self.ssh_keygen, rv))
556
557    def gentopo(self, str):
558        """
559        Generate the topology dtat structure from the splitter's XML
560        representation of it.
561
562        The topology XML looks like:
563            <experiment>
564                <nodes>
565                    <node><vname></vname><ips>ip1:ip2</ips></node>
566                </nodes>
567                <lans>
568                    <lan>
569                        <vname></vname><vnode></vnode><ip></ip>
570                        <bandwidth></bandwidth><member>node:port</member>
571                    </lan>
572                </lans>
573        """
574        class topo_parse:
575            """
576            Parse the topology XML and create the dats structure.
577            """
578            def __init__(self):
579                # Typing of the subelements for data conversion
580                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
581                self.int_subelements = ( 'bandwidth',)
582                self.float_subelements = ( 'delay',)
583                # The final data structure
584                self.nodes = [ ]
585                self.lans =  [ ]
586                self.topo = { \
587                        'node': self.nodes,\
588                        'lan' : self.lans,\
589                    }
590                self.element = { }  # Current element being created
591                self.chars = ""     # Last text seen
592
593            def end_element(self, name):
594                # After each sub element the contents is added to the current
595                # element or to the appropriate list.
596                if name == 'node':
597                    self.nodes.append(self.element)
598                    self.element = { }
599                elif name == 'lan':
600                    self.lans.append(self.element)
601                    self.element = { }
602                elif name in self.str_subelements:
603                    self.element[name] = self.chars
604                    self.chars = ""
605                elif name in self.int_subelements:
606                    self.element[name] = int(self.chars)
607                    self.chars = ""
608                elif name in self.float_subelements:
609                    self.element[name] = float(self.chars)
610                    self.chars = ""
611
612            def found_chars(self, data):
613                self.chars += data.rstrip()
614
615
616        tp = topo_parse();
617        parser = xml.parsers.expat.ParserCreate()
618        parser.EndElementHandler = tp.end_element
619        parser.CharacterDataHandler = tp.found_chars
620
621        parser.Parse(str)
622
623        return tp.topo
624       
625
626    def genviz(self, topo):
627        """
628        Generate the visualization the virtual topology
629        """
630
631        neato = "/usr/local/bin/neato"
632        # These are used to parse neato output and to create the visualization
633        # file.
634        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
635        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
636                "%s</type></node>"
637
638        try:
639            # Node names
640            nodes = [ n['vname'] for n in topo['node'] ]
641            topo_lans = topo['lan']
642        except KeyError, e:
643            raise service_error(service_error.internal, "Bad topology: %s" %e)
644
645        lans = { }
646        links = { }
647
648        # Walk through the virtual topology, organizing the connections into
649        # 2-node connections (links) and more-than-2-node connections (lans).
650        # When a lan is created, it's added to the list of nodes (there's a
651        # node in the visualization for the lan).
652        for l in topo_lans:
653            if links.has_key(l['vname']):
654                if len(links[l['vname']]) < 2:
655                    links[l['vname']].append(l['vnode'])
656                else:
657                    nodes.append(l['vname'])
658                    lans[l['vname']] = links[l['vname']]
659                    del links[l['vname']]
660                    lans[l['vname']].append(l['vnode'])
661            elif lans.has_key(l['vname']):
662                lans[l['vname']].append(l['vnode'])
663            else:
664                links[l['vname']] = [ l['vnode'] ]
665
666
667        # Open up a temporary file for dot to turn into a visualization
668        try:
669            df, dotname = tempfile.mkstemp()
670            dotfile = os.fdopen(df, 'w')
671        except IOError:
672            raise service_error(service_error.internal,
673                    "Failed to open file in genviz")
674
675        try:
676            dnull = open('/dev/null', 'w')
677        except IOError:
678            service_error(service_error.internal,
679                    "Failed to open /dev/null in genviz")
680
681        # Generate a dot/neato input file from the links, nodes and lans
682        try:
683            print >>dotfile, "graph G {"
684            for n in nodes:
685                print >>dotfile, '\t"%s"' % n
686            for l in links.keys():
687                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
688            for l in lans.keys():
689                for n in lans[l]:
690                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
691            print >>dotfile, "}"
692            dotfile.close()
693        except TypeError:
694            raise service_error(service_error.internal,
695                    "Single endpoint link in vtopo")
696        except IOError:
697            raise service_error(service_error.internal, "Cannot write dot file")
698
699        # Use dot to create a visualization
700        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
701                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
702                close_fds=True)
703        dnull.close()
704
705        # Translate dot to vis format
706        vis_nodes = [ ]
707        vis = { 'node': vis_nodes }
708        for line in dot.stdout:
709            m = vis_re.match(line)
710            if m:
711                vn = m.group(1)
712                vis_node = {'name': vn, \
713                        'x': float(m.group(2)),\
714                        'y' : float(m.group(3)),\
715                    }
716                if vn in links.keys() or vn in lans.keys():
717                    vis_node['type'] = 'lan'
718                else:
719                    vis_node['type'] = 'node'
720                vis_nodes.append(vis_node)
721        rv = dot.wait()
722
723        os.remove(dotname)
724        if rv == 0 : return vis
725        else: return None
726
727    def get_access(self, tb, nodes, tbparam, master, export_project,
728            access_user, services):
729        """
730        Get access to testbed through fedd and set the parameters for that tb
731        """
732        uri = self.tbmap.get(tb, None)
733        if not uri:
734            raise service_error(serice_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        # Tweak search order so that if there are entries in access_user that
738        # have a project matching the export project, we try them first
739        if export_project and export_project.has_key('localname'): 
740            pn = export_project['localname'] 
741               
742            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
743            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
744        else: 
745            access_sequence = access_user
746
747        for p, u in access_sequence: 
748            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
749                    "to %s") %  ((p or "None"), u, uri))
750
751            if p:
752                # Request with user and project specified
753                req = {\
754                        'destinationTestbed' : { 'uri' : uri },
755                        'credential': [ "project: %s" % p, "user: %s"  % u],
756                        'allocID' : { 'localname': 'test' },
757                    }
758            else:
759                # Request with only user specified
760                req = {\
761                        'destinationTestbed' : { 'uri' : uri },
762                        'credential': [ 'user: %s' % u ],
763                        'user':  [ {'userID': { 'localname': u } } ],
764                        'allocID' : { 'localname': 'test' },
765                    }
766
767            if tb == master:
768                # NB, the export_project parameter is a dict that includes
769                # the type
770                req['exportProject'] = export_project
771                req['service'] = [
772                        { 'name': 'userconfig', 'visibility': 'export'},
773                        { 'name': 'SMB', 'visibility': 'export'},
774                        { 'name': 'seer', 'visibility': 'export'},
775                        { 'name': 'tmcd', 'visibility': 'export'},
776                    ]
777
778            # node resources if any
779            if nodes != None and len(nodes) > 0:
780                rnodes = [ ]
781                for n in nodes:
782                    rn = { }
783                    image, hw, count = n.split(":")
784                    if image: rn['image'] = [ image ]
785                    if hw: rn['hardware'] = [ hw ]
786                    if count and int(count) >0 : rn['count'] = int(count)
787                    rnodes.append(rn)
788                req['resources']= { }
789                req['resources']['node'] = rnodes
790
791            try:
792                if self.local_access.has_key(uri):
793                    # Local access call
794                    req = { 'RequestAccessRequestBody' : req }
795                    r = self.local_access[uri].RequestAccess(req, 
796                            fedid(file=self.cert_file))
797                    r = { 'RequestAccessResponseBody' : r }
798                else:
799                    r = self.call_RequestAccess(uri, req, 
800                            self.cert_file, self.cert_pwd, self.trusted_certs)
801            except service_error, e:
802                if e.code == service_error.access:
803                    self.log.debug("[get_access] Access denied")
804                    r = None
805                    continue
806                else:
807                    raise e
808
809            if r.has_key('RequestAccessResponseBody'):
810                # Through to here we have a valid response, not a fault.
811                # Access denied is a fault, so something better or worse than
812                # access denied has happened.
813                r = r['RequestAccessResponseBody']
814                self.log.debug("[get_access] Access granted")
815                break
816            else:
817                raise service_error(service_error.protocol,
818                        "Bad proxy response")
819       
820        if not r:
821            raise service_error(service_error.access, 
822                    "Access denied by %s (%s)" % (tb, uri))
823
824        tbparam[tb] = { 
825                "allocID" : r['allocID'],
826                "uri": uri,
827                }
828        if 'service' in r: 
829            services.extend(r['service'])
830
831        # Add attributes to parameter space.  We don't allow attributes to
832        # overlay any parameters already installed.
833        for a in r.get('fedAttr', []):
834            try:
835                if a['attribute'] and \
836                        isinstance(a['attribute'], basestring)\
837                        and not tbparam[tb].has_key(a['attribute'].lower()):
838                    tbparam[tb][a['attribute'].lower()] = a['value']
839            except KeyError:
840                self.log.error("Bad attribute in response: %s" % a)
841
842    def release_access(self, tb, aid, uri=None):
843        """
844        Release access to testbed through fedd
845        """
846
847        if not uri:
848            uri = self.tbmap.get(tb, None)
849        if not uri:
850            raise service_error(service_error.server_config, 
851                    "Unknown testbed: %s" % tb)
852
853        if self.local_access.has_key(uri):
854            resp = self.local_access[uri].ReleaseAccess(\
855                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
856                    fedid(file=self.cert_file))
857            resp = { 'ReleaseAccessResponseBody': resp } 
858        else:
859            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
860                    self.cert_file, self.cert_pwd, self.trusted_certs)
861
862        # better error coding
863
864    def remote_splitter(self, uri, desc, master):
865
866        req = {
867                'description' : { 'ns2description': desc },
868                'master': master,
869                'include_fedkit': bool(self.fedkit),
870                'include_gatewaykit': bool(self.gatewaykit)
871            }
872
873        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
874                self.trusted_certs)
875
876        if r.has_key('Ns2SplitResponseBody'):
877            r = r['Ns2SplitResponseBody']
878            if r.has_key('output'):
879                return r['output'].splitlines()
880            else:
881                raise service_error(service_error.protocol, 
882                        "Bad splitter response (no output)")
883        else:
884            raise service_error(service_error.protocol, "Bad splitter response")
885
886    class start_segment:
887        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
888                cert_pwd=None, trusted_certs=None, caller=None,
889                log_collector=None):
890            self.log = log
891            self.debug = debug
892            self.cert_file = cert_file
893            self.cert_pwd = cert_pwd
894            self.trusted_certs = None
895            self.caller = caller
896            self.testbed = testbed
897            self.log_collector = log_collector
898            self.response = None
899
900        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
901                services=None):
902            req = {
903                    'allocID': { 'fedid' : aid }, 
904                    'segmentdescription': { 
905                        'topdldescription': topo.to_dict(),
906                    },
907                    'master': master,
908                }
909
910            if connInfo:
911                req['connection'] = connInfo
912            # Add services to request.  The master exports, everyone else
913            # imports.
914            if services:
915                svcs = [ x.copy() for x in services]
916                for s in svcs:
917                    if master: s['visibility'] = 'export'
918                    else: s['visibility'] = 'import'
919                req['service'] = svcs
920            if attrs:
921                req['fedAttr'] = attrs
922
923            try:
924                self.log.debug("Calling StartSegment at %s " % uri)
925                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
926                        self.trusted_certs)
927                if r.has_key('StartSegmentResponseBody'):
928                    lval = r['StartSegmentResponseBody'].get('allocationLog',
929                            None)
930                    if lval and self.log_collector:
931                        for line in  lval.splitlines(True):
932                            self.log_collector.write(line)
933                    self.response = r
934                else:
935                    raise service_error(service_error.internal, 
936                            "Bad response!?: %s" %r)
937                return True
938            except service_error, e:
939                self.log.error("Start segment failed on %s: %s" % \
940                        (self.testbed, e))
941                return False
942
943
944
945    class terminate_segment:
946        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
947                cert_pwd=None, trusted_certs=None, caller=None):
948            self.log = log
949            self.debug = debug
950            self.cert_file = cert_file
951            self.cert_pwd = cert_pwd
952            self.trusted_certs = None
953            self.caller = caller
954            self.testbed = testbed
955
956        def __call__(self, uri, aid ):
957            req = {
958                    'allocID': aid , 
959                }
960            try:
961                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
962                        self.trusted_certs)
963                return True
964            except service_error, e:
965                self.log.error("Terminate segment failed on %s: %s" % \
966                        (self.testbed, e))
967                return False
968   
969
970    def allocate_resources(self, allocated, master, eid, expid, 
971            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
972            attrs=None, connInfo={}, services=[]):
973        def get_vlan(r):
974            if r.has_key('StartSegmentResponseBody'):
975                srb = r['StartSegmentResponseBody']
976                if srb.has_key('fedAttr'):
977                    for k, v in [ (a['attribute'], a['value']) \
978                            for a in srb['fedAttr']]:
979                        if k == 'vlan': return v
980            return None
981
982        started = { }           # Testbeds where a sub-experiment started
983                                # successfully
984
985        # XXX
986        fail_soft = False
987
988        non_transit = [ k for k in allocated.keys() \
989                if not topo[k].get_attribute('transit')]
990        transit = [ k for k in allocated.keys() \
991                if topo[k].get_attribute('transit')]
992
993        log = alloc_log or self.log
994
995        thread_pool = self.thread_pool(self.nthreads)
996        threads = [ ]
997
998        for tb in transit:
999            uri = tbparams[tb]['uri']
1000            if tbparams[tb].has_key('allocID') and \
1001                    tbparams[tb]['allocID'].has_key('fedid'):
1002                aid = tbparams[tb]['allocID']['fedid']
1003            else:
1004                raise service_error(service_error.internal, 
1005                        "No alloc id for testbed %s !?" % tb)
1006
1007            m = re.search('(\d+)', tb)
1008            if m:
1009                to_repl = "unassigned%s" % m.group(1)
1010            else:
1011                raise service_error(service_error.internal, 
1012                        "Bad dynamic allocation name")
1013                break
1014
1015            ss = self.start_segment(log=log, debug=self.debug, 
1016                testbed=tb, cert_file=self.cert_file, 
1017                cert_pwd=self.cert_pwd, 
1018                trusted_certs=self.trusted_certs,
1019                caller=self.call_StartSegment,
1020                log_collector=log_collector)
1021            t = self.pooled_thread(
1022                    target=ss,
1023                    args =(uri, aid, topo[tb], False, attrs, connInfo[tb],
1024                        services),
1025                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1026            threads.append(t)
1027            t.start()
1028            # Wait until the this transit node finishes (keep pinging the log,
1029            # though)
1030
1031            mins = 0
1032            while not thread_pool.wait_for_all_done(60.0):
1033                mins += 1
1034                alloc_log.info("Waiting for transit (it has been %d mins)" \
1035                        % mins)
1036
1037            if t.rv:
1038                vlan = get_vlan(ss.response)
1039                if vlan is not None:
1040                    for v in connInfo.values():
1041                        for i in v:
1042                            for a in i.get('fedAttr', []):
1043                                if a.get('attribute', "") == 'vlan_id' and \
1044                                        a.get('value', "") == to_repl:
1045                                    a['value'] = vlan
1046            else:
1047                break
1048            thread_pool.clear()
1049
1050
1051        failed = [ t.getName() for t in threads if not t.rv ]
1052
1053        if len(failed) == 0:
1054            for tb in non_transit:
1055                # Create and start a thread to start the segment, and save it
1056                # to get the return value later
1057                thread_pool.wait_for_slot()
1058                uri = self.tbmap.get(tb, None)
1059                if not uri:
1060                    raise service_error(service_error.internal, 
1061                            "Unknown testbed %s !?" % tb)
1062
1063                if tbparams[tb].has_key('allocID') and \
1064                        tbparams[tb]['allocID'].has_key('fedid'):
1065                    aid = tbparams[tb]['allocID']['fedid']
1066                else:
1067                    raise service_error(service_error.internal, 
1068                            "No alloc id for testbed %s !?" % tb)
1069
1070                t  = self.pooled_thread(\
1071                        target=self.start_segment(log=log, debug=self.debug,
1072                            testbed=tb, cert_file=self.cert_file,
1073                            cert_pwd=self.cert_pwd,
1074                            trusted_certs=self.trusted_certs,
1075                            caller=self.call_StartSegment,
1076                            log_collector=log_collector), 
1077                        args=(uri, aid, topo[tb], tb == master, 
1078                            attrs, connInfo[tb], services),
1079                        name=tb,
1080                        pdata=thread_pool, trace_file=self.trace_file)
1081                threads.append(t)
1082                t.start()
1083
1084            # Wait until all finish (keep pinging the log, though)
1085            mins = 0
1086            while not thread_pool.wait_for_all_done(60.0):
1087                mins += 1
1088                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1089                        % mins)
1090
1091            thread_pool.clear()
1092
1093        failed = [ t.getName() for t in threads if not t.rv ]
1094        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1095
1096        # If one failed clean up, unless fail_soft is set
1097        if failed:
1098            if not fail_soft:
1099                thread_pool.clear()
1100                for tb in succeeded:
1101                    # Create and start a thread to stop the segment
1102                    thread_pool.wait_for_slot()
1103                    uri = tbparams[tb]['uri']
1104                    t  = self.pooled_thread(\
1105                            target=self.terminate_segment(log=log,
1106                                testbed=tb,
1107                                cert_file=self.cert_file, 
1108                                cert_pwd=self.cert_pwd,
1109                                trusted_certs=self.trusted_certs,
1110                                caller=self.call_TerminateSegment),
1111                            args=(uri, tbparams[tb]['federant']['allocID']),
1112                            name=tb,
1113                            pdata=thread_pool, trace_file=self.trace_file)
1114                    t.start()
1115                # Wait until all finish
1116                thread_pool.wait_for_all_done()
1117
1118                # release the allocations
1119                for tb in tbparams.keys():
1120                    self.release_access(tb, tbparams[tb]['allocID'],
1121                            tbparams[tb].get('uri', None))
1122                # Remove the placeholder
1123                self.state_lock.acquire()
1124                self.state[eid]['experimentStatus'] = 'failed'
1125                if self.state_filename: self.write_state()
1126                self.state_lock.release()
1127
1128                log.error("Swap in failed on %s" % ",".join(failed))
1129                return
1130        else:
1131            log.info("[start_segment]: Experiment %s active" % eid)
1132
1133
1134        # Walk up tmpdir, deleting as we go
1135        if self.cleanup:
1136            log.debug("[start_experiment]: removing %s" % tmpdir)
1137            for path, dirs, files in os.walk(tmpdir, topdown=False):
1138                for f in files:
1139                    os.remove(os.path.join(path, f))
1140                for d in dirs:
1141                    os.rmdir(os.path.join(path, d))
1142            os.rmdir(tmpdir)
1143        else:
1144            log.debug("[start_experiment]: not removing %s" % tmpdir)
1145
1146        # Insert the experiment into our state and update the disk copy
1147        self.state_lock.acquire()
1148        self.state[expid]['experimentStatus'] = 'active'
1149        self.state[eid] = self.state[expid]
1150        if self.state_filename: self.write_state()
1151        self.state_lock.release()
1152        return
1153
1154
1155    def add_kit(self, e, kit):
1156        """
1157        Add a Software object created from the list of (install, location)
1158        tuples passed as kit  to the software attribute of an object e.  We
1159        do this enough to break out the code, but it's kind of a hack to
1160        avoid changing the old tuple rep.
1161        """
1162
1163        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1164
1165        if isinstance(e.software, list): e.software.extend(s)
1166        else: e.software = s
1167
1168
1169    def create_experiment_state(self, fid, req, expid, expcert, 
1170            state='starting'):
1171        """
1172        Create the initial entry in the experiment's state.  The expid and
1173        expcert are the experiment's fedid and certifacte that represents that
1174        ID, which are installed in the experiment state.  If the request
1175        includes a suggested local name that is used if possible.  If the local
1176        name is already taken by an experiment owned by this user that has
1177        failed, it is overwritten.  Otherwise new letters are added until a
1178        valid localname is found.  The generated local name is returned.
1179        """
1180
1181        if req.has_key('experimentID') and \
1182                req['experimentID'].has_key('localname'):
1183            overwrite = False
1184            eid = req['experimentID']['localname']
1185            # If there's an old failed experiment here with the same local name
1186            # and accessible by this user, we'll overwrite it, otherwise we'll
1187            # fall through and do the collision avoidance.
1188            old_expid = self.get_experiment_fedid(eid)
1189            if old_expid and self.check_experiment_access(fid, old_expid):
1190                self.state_lock.acquire()
1191                status = self.state[eid].get('experimentStatus', None)
1192                if status and status == 'failed':
1193                    # remove the old access attribute
1194                    self.auth.unset_attribute(fid, old_expid)
1195                    overwrite = True
1196                    del self.state[eid]
1197                    del self.state[old_expid]
1198                self.state_lock.release()
1199            self.state_lock.acquire()
1200            while (self.state.has_key(eid) and not overwrite):
1201                eid += random.choice(string.ascii_letters)
1202            # Initial state
1203            self.state[eid] = {
1204                    'experimentID' : \
1205                            [ { 'localname' : eid }, {'fedid': expid } ],
1206                    'experimentStatus': state,
1207                    'experimentAccess': { 'X509' : expcert },
1208                    'owner': fid,
1209                    'log' : [],
1210                }
1211            self.state[expid] = self.state[eid]
1212            if self.state_filename: self.write_state()
1213            self.state_lock.release()
1214        else:
1215            eid = self.exp_stem
1216            for i in range(0,5):
1217                eid += random.choice(string.ascii_letters)
1218            self.state_lock.acquire()
1219            while (self.state.has_key(eid)):
1220                eid = self.exp_stem
1221                for i in range(0,5):
1222                    eid += random.choice(string.ascii_letters)
1223            # Initial state
1224            self.state[eid] = {
1225                    'experimentID' : \
1226                            [ { 'localname' : eid }, {'fedid': expid } ],
1227                    'experimentStatus': state,
1228                    'experimentAccess': { 'X509' : expcert },
1229                    'owner': fid,
1230                    'log' : [],
1231                }
1232            self.state[expid] = self.state[eid]
1233            if self.state_filename: self.write_state()
1234            self.state_lock.release()
1235
1236        return eid
1237
1238
1239    def allocate_ips_to_topo(self, top):
1240        """
1241        Add an ip4_address attribute to all the hosts in the topology, based on
1242        the shared substrates on which they sit.  An /etc/hosts file is also
1243        created and returned as a list of hostfiles entries.  We also return
1244        the allocator, because we may need to allocate IPs to portals
1245        (specifically DRAGON portals).
1246        """
1247        subs = sorted(top.substrates, 
1248                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1249                reverse=True)
1250        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1251        ifs = { }
1252        hosts = [ ]
1253
1254        for idx, s in enumerate(subs):
1255            a = ips.allocate(len(s.interfaces)+2)
1256            if a :
1257                base, num = a
1258                if num < len(s.interfaces) +2 : 
1259                    raise service_error(service_error.internal,
1260                            "Allocator returned wrong number of IPs??")
1261            else:
1262                raise service_error(service_error.req, 
1263                        "Cannot allocate IP addresses")
1264
1265            base += 1
1266            for i in s.interfaces:
1267                i.attribute.append(
1268                        topdl.Attribute('ip4_address', 
1269                            "%s" % ip_addr(base)))
1270                hname = i.element.name[0]
1271                if ifs.has_key(hname):
1272                    hosts.append("%s\t%s-%s %s-%d" % \
1273                            (ip_addr(base), hname, s.name, hname,
1274                                ifs[hname]))
1275                else:
1276                    ifs[hname] = 0
1277                    hosts.append("%s\t%s-%s %s-%d %s" % \
1278                            (ip_addr(base), hname, s.name, hname,
1279                                ifs[hname], hname))
1280
1281                ifs[hname] += 1
1282                base += 1
1283        return hosts, ips
1284
1285    def get_access_to_testbeds(self, testbeds, access_user, 
1286            export_project, master, allocated, tbparams, services):
1287        """
1288        Request access to the various testbeds required for this instantiation
1289        (passed in as testbeds).  User, access_user, expoert_project and master
1290        are used to construct the correct requests.  Per-testbed parameters are
1291        returned in tbparams.
1292        """
1293        for tb in testbeds:
1294            self.get_access(tb, None, tbparams, master,
1295                    export_project, access_user, services)
1296            allocated[tb] = 1
1297
1298    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1299        """
1300        Create the sub-topologies that are needed for experiment instantiation.
1301        """
1302        for tb in testbeds:
1303            topo[tb] = top.clone()
1304            to_delete = [ ]
1305            # XXX: copy in for loop to simplify
1306            for e in topo[tb].elements:
1307                etb = e.get_attribute('testbed')
1308                if etb and etb != tb:
1309                    for i in e.interface:
1310                        for s in i.subs:
1311                            try:
1312                                s.interfaces.remove(i)
1313                            except ValueError:
1314                                raise service_error(service_error.internal,
1315                                        "Can't remove interface??")
1316                    to_delete.append(e)
1317            for e in to_delete:
1318                topo[tb].elements.remove(e)
1319            topo[tb].make_indices()
1320
1321            for e in [ e for e in topo[tb].elements \
1322                    if isinstance(e,topdl.Computer)]:
1323                if self.fedkit: self.add_kit(e, self.fedkit)
1324
1325    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1326            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[]):
1327        """
1328        Return a new internet portal node and a dict with the connectionInfo to
1329        be attached.
1330        """
1331        dproject = tbparams[dt].get('project', 'project')
1332        ddomain = tbparams[dt].get('domain', ".example.com")
1333        mdomain = tbparams[master].get('domain', '.example.com')
1334        mproject = tbparams[master].get('project', 'project')
1335        muser = tbparams[master].get('user', 'root')
1336        smbshare = tbparams[master].get('smbshare', 'USERS')
1337
1338        if st == master or dt == master:
1339            active = ("%s" % (st == master))
1340        else:
1341            active = ("%s" % (st > dt))
1342
1343        ifaces = [ ]
1344        for sub, attrs in iface_desc:
1345            inf = topdl.Interface(
1346                    name="inf%03d" % len(ifaces),
1347                    substrate=sub,
1348                    attribute=[
1349                        topdl.Attribute(
1350                            attribute=n,
1351                            value = v)
1352                        for n, v in attrs
1353                        ]
1354                    )
1355            ifaces.append(inf)
1356        if conn_type == "ssh":
1357            info = {
1358                    "type" : conn_type, 
1359                    "portal": myname,
1360                    'peer': desthost,
1361                    'fedAttr': [ 
1362                            { 'attribute': 'masterdomain', 'value': mdomain},
1363                            { 'attribute': 'masterexperiment', 'value': 
1364                                "%s/%s" % (mproject, eid)},
1365                            { 'attribute': 'active', 'value': active},
1366                            # Move to SMB service description
1367                            { 'attribute': 'masteruser', 'value': muser},
1368                            { 'attribute': 'smbshare', 'value': smbshare},
1369                        ],
1370                    }
1371        else:
1372            info = None
1373
1374        return (topdl.Computer(
1375                name=myname,
1376                attribute=[ 
1377                    topdl.Attribute(attribute=n,value=v)
1378                        for n, v in (\
1379                            ('portal', 'true'),
1380                            ('portal_type', portal_type), 
1381                        )
1382                    ],
1383                interface=ifaces,
1384                ), info)
1385
1386    def new_portal_substrate(self, st, dt, eid, tbparams):
1387        ddomain = tbparams[dt].get('domain', ".example.com")
1388        dproject = tbparams[dt].get('project', 'project')
1389        tsubstrate = \
1390                topdl.Substrate(name='%s-%s' % (st, dt),
1391                        attribute= [
1392                            topdl.Attribute(
1393                                attribute='portal',
1394                                value='true')
1395                            ]
1396                        )
1397        segment_element = topdl.Segment(
1398                id= tbparams[dt]['allocID'],
1399                type='emulab',
1400                uri = self.tbmap.get(dt, None),
1401                interface=[ 
1402                    topdl.Interface(
1403                        substrate=tsubstrate.name),
1404                    ],
1405                attribute = [
1406                    topdl.Attribute(attribute=n, value=v)
1407                        for n, v in (\
1408                            ('domain', ddomain),
1409                            ('experiment', "%s/%s" % \
1410                                    (dproject, eid)),)
1411                    ],
1412                )
1413
1414        return (tsubstrate, segment_element)
1415
1416    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
1417        if sub.capacity is None:
1418            raise service_error(service_error.internal,
1419                    "Cannot DRAGON split substrate w/o capacity")
1420        segs = [ ]
1421        substr = topdl.Substrate(name="dragon%d" % idx, 
1422                capacity=sub.capacity.clone(),
1423                attribute=[ topdl.Attribute(attribute=n, value=v)
1424                    for n, v, in (\
1425                            ('vlan', 'unassigned%d' % idx),)])
1426        for tb in tbs.keys():
1427            seg = topdl.Segment(
1428                    id = tbparams[tb]['allocID'],
1429                    type='emulab',
1430                    uri = self.tbmap.get(tb, None),
1431                    interface=[ 
1432                        topdl.Interface(
1433                            substrate=substr.name),
1434                        ],
1435                    attribute=[ topdl.Attribute(
1436                        attribute='dragon_endpoint', 
1437                        value=tbparams[tb]['dragon']),
1438                        ]
1439                    )
1440            if tbparams[tb].has_key('vlans'):
1441                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1442            segs.append(seg)
1443
1444        topo["dragon%d" %idx] = \
1445                topdl.Topology(substrates=[substr], elements=segs,
1446                        attribute=[
1447                            topdl.Attribute(attribute="transit", value='true'),
1448                            topdl.Attribute(attribute="dynamic", value='true'),
1449                            topdl.Attribute(attribute="testbed", value='dragon'),
1450                            ]
1451                        )
1452
1453    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
1454            connInfo, peer={ }):
1455        """
1456        Add attribiutes to the various elements indicating that they are to be
1457        dragon connected and create a dragon segment in topo to be
1458        instantiated.
1459        """
1460
1461        def get_substrate_from_topo(name, t):
1462            for s in t.substrates:
1463                if s.name == name: return s
1464            else: return None
1465
1466        mdomain = tbparams[master].get('domain', '.example.com')
1467        mproject = tbparams[master].get('project', 'project')
1468        # dn is the number of previously created dragon nets.  This routine
1469        # creates a net numbered by dn
1470        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1471        # Count the number of interfaces on this substrate in each testbed from
1472        # the global topology
1473        count = { }
1474        node = { }
1475        for e in [ i.element for i in sub.interfaces ]:
1476            tb = e.get_attribute('testbed')
1477            count[tb] = count.get(tb, 0) + 1
1478            node[tb] = i.get_attribute('ip4_address')
1479
1480
1481        # Set the attributes in the copies that will allow setup of dragon
1482        # connections.
1483        for tb in tbs.keys():
1484            s = get_substrate_from_topo(sub.name, topo[tb])
1485            if s:
1486                if not connInfo.has_key(tb):
1487                    connInfo[tb] = [ ]
1488
1489                # This may need another look, but only a service gateway will
1490                # look at the active parameter, and these are only inserted to
1491                # connect to the master.
1492                active = "%s" % ( tb == 'master')
1493                info = {
1494                        'type': 'transit',
1495                        'member': [ {
1496                            'element': i.element.name[0], 
1497                            'interface': i.name
1498                            } for i in s.interfaces \
1499                                    if isinstance(i.element, topdl.Computer) ],
1500                        'fedAttr': [ 
1501                            { 'attribute': 'vlan_id', 
1502                                'value': 'unassigned%d' % dn },
1503                            { 'attribute': 'masterdomain', 'value': mdomain},
1504                            { 'attribute': 'masterexperiment', 'value': 
1505                                "%s/%s" % (mproject, eid)},
1506                            { 'attribute': 'active', 'value': active},
1507                            ],
1508                        }
1509                if peer.has_key(tb):
1510                    info['peer'] = peer[tb]
1511                connInfo[tb].append(info)
1512            else:
1513                raise service_error(service_error.internal,
1514                        "No substrate %s in testbed %s" % (sub.name, tb))
1515
1516        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1517
1518    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1519            segment_substrate, portals, connInfo):
1520        # More than one testbed is on this substrate.  Insert
1521        # some portals into the subtopologies.  st == source testbed,
1522        # dt == destination testbed.
1523        for st in tbs.keys():
1524            if not segment_substrate.has_key(st):
1525                segment_substrate[st] = { }
1526            if not portals.has_key(st): 
1527                portals[st] = { }
1528            if not connInfo.has_key(st):
1529                connInfo[st] = [ ]
1530            for dt in [ t for t in tbs.keys() if t != st]:
1531                sproject = tbparams[st].get('project', 'project')
1532                dproject = tbparams[dt].get('project', 'project')
1533                mproject = tbparams[master].get('project', 'project')
1534                sdomain = tbparams[st].get('domain', ".example.com")
1535                ddomain = tbparams[dt].get('domain', ".example.com")
1536                mdomain = tbparams[master].get('domain', '.example.com')
1537                muser = tbparams[master].get('user', 'root')
1538                smbshare = tbparams[master].get('smbshare', 'USERS')
1539                aid = tbparams[dt]['allocID']['fedid']
1540                if st == master or dt == master:
1541                    active = ("%s" % (st == master))
1542                else:
1543                    active = ("%s" %(st > dt))
1544                if not segment_substrate[st].has_key(dt):
1545                    # Put a substrate and a segment for the connected
1546                    # testbed in there.
1547                    tsubstrate, segment_element = \
1548                            self.new_portal_substrate(st, dt, eid, tbparams)
1549                    segment_substrate[st][dt] = tsubstrate
1550                    topo[st].substrates.append(tsubstrate)
1551                    topo[st].elements.append(segment_element)
1552
1553                new_portal = False
1554                if portals[st].has_key(dt):
1555                    # There's a portal set up to go to this destination.
1556                    # See if there's room to multiplex this connection on
1557                    # it.  If so, add an interface to the portal; if not,
1558                    # set up to add a portal below.
1559                    # [This little festival of braces is just a pop of the
1560                    # last element in the list of portals between st and
1561                    # dt.]
1562                    portal = portals[st][dt][-1]
1563                    mux = len([ i for i in portal.interface \
1564                            if not i.get_attribute('portal')])
1565                    if mux == self.muxmax:
1566                        new_portal = True
1567                        portal_type = "experiment"
1568                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1569                        desthost = "%stunnel%d.%s.%s%s" % (st, 
1570                                len(portals[st][dt]), eid.lower(),
1571                                dproject.lower(), ddomain.lower())
1572                    else:
1573                        new_i = topdl.Interface(
1574                                substrate=sub.name,
1575                                attribute=[ 
1576                                    topdl.Attribute(
1577                                        attribute='ip4_address', 
1578                                        value=tbs[dt]
1579                                    )
1580                                ])
1581                        portal.interface.append(new_i)
1582                else:
1583                    # First connection to this testbed, make an empty list
1584                    # and set up to add the new portal below
1585                    new_portal = True
1586                    portals[st][dt] = [ ]
1587                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1588                    desthost = "%stunnel%d.%s.%s%s" % (st.lower(), 
1589                            len(portals[st][dt]), eid.lower(), 
1590                            dproject.lower(), ddomain.lower())
1591
1592                    if dt == master or st == master: portal_type = "both"
1593                    else: portal_type = "experiment"
1594
1595                if new_portal:
1596                    infs = (
1597                            (segment_substrate[st][dt].name, 
1598                                (('portal', 'true'),)),
1599                            (sub.name, 
1600                                (('ip4_address', tbs[dt]),))
1601                        )
1602                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
1603                            master, eid, myname, desthost, portal_type,
1604                            infs)
1605                    if self.fedkit:
1606                        self.add_kit(portal, self.fedkit)
1607                    if self.gatewaykit: 
1608                        self.add_kit(portal, self.gatewaykit)
1609
1610                    topo[st].elements.append(portal)
1611                    portals[st][dt].append(portal)
1612                    connInfo[st].append(info)
1613
1614    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo):
1615        # Add to the master testbed
1616        tsubstrate, segment_element = \
1617                self.new_portal_substrate(st, dt, eid, tbparams)
1618        myname = "%stunnel" % dt
1619        desthost = "%stunnel" % st
1620
1621        portal, info = self.new_portal_node(st, dt, tbparams, master,
1622                eid, myname, desthost, "control", 
1623                ((tsubstrate.name,(('portal','true'),)),))
1624        if self.fedkit:
1625            self.add_kit(portal, self.fedkit)
1626        if self.gatewaykit: 
1627            self.add_kit(portal, self.gatewaykit)
1628
1629        topo[st].substrates.append(tsubstrate)
1630        topo[st].elements.append(segment_element)
1631        topo[st].elements.append(portal)
1632        if not connInfo.has_key(st):
1633            connInfo[st] = [ ]
1634        connInfo[st].append(info)
1635
1636    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1637            substrate, tbparams):
1638        # Add to the master testbed
1639        myname = "%stunnel" % dt
1640        desthost = "%s" % ip_addr(dip)
1641
1642        portal, info = self.new_portal_node(st, dt, tbparams, master,
1643                eid, myname, desthost, "control", 
1644                ((substrate.name,(
1645                    ('portal','true'),
1646                    ('ip4_address', "%s" % ip_addr(myip)),)),),
1647                conn_type="transit")
1648        if self.fedkit:
1649            self.add_kit(portal, self.fedkit)
1650        if self.gatewaykit: 
1651            self.add_kit(portal, self.gatewaykit)
1652
1653        return portal
1654
1655    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
1656            connInfo):
1657        """
1658        For each substrate in the main topology, find those that
1659        have nodes on more than one testbed.  Insert portal nodes
1660        into the copies of those substrates on the sub topologies.
1661        """
1662        segment_substrate = { }
1663        portals = { }
1664        for s in top.substrates:
1665            # tbs will contain an ip address on this subsrate that is in
1666            # each testbed.
1667            tbs = { }
1668            for i in s.interfaces:
1669                e = i.element
1670                tb = e.get_attribute('testbed')
1671                if tb and not tbs.has_key(tb):
1672                    for i in e.interface:
1673                        if s in i.subs:
1674                            tbs[tb]= i.get_attribute('ip4_address')
1675            if len(tbs) < 2:
1676                continue
1677
1678            # DRAGON will not create multi-site vlans yet
1679            if len(tbs) == 2 and \
1680                    all([tbparams[x].has_key('dragon') for x in tbs]):
1681                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1682                        master, eid, connInfo)
1683            else:
1684                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1685                        eid, segment_substrate, portals, connInfo)
1686
1687        # Make sure that all the slaves have a control portal back to the
1688        # master.
1689        for tb in [ t for t in tbparams.keys() if t != master ]:
1690            if len([e for e in topo[tb].elements \
1691                    if isinstance(e, topdl.Computer) and \
1692                    e.get_attribute('portal') and \
1693                    e.get_attribute('portal_type') == 'both']) == 0:
1694
1695                if tbparams[master].has_key('dragon') \
1696                        and tbparams[tb].has_key('dragon'):
1697
1698                    idx = len([x for x in topo.keys() \
1699                            if x.startswith('dragon')])
1700                    dip, leng = ip_allocator.allocate(4)
1701                    dip += 1
1702                    mip = dip+1
1703                    csub = topdl.Substrate(
1704                            name="dragon-control-%s" % tb,
1705                            capacity=topdl.Capacity(100000.0, 'max'),
1706                            attribute=[
1707                                topdl.Attribute(
1708                                    attribute='portal',
1709                                    value='true'
1710                                    )
1711                                ]
1712                            )
1713                    seg = topdl.Segment(
1714                            id= tbparams[master]['allocID'],
1715                            type='emulab',
1716                            uri = self.tbmap.get(master, None),
1717                            interface=[ 
1718                                topdl.Interface(
1719                                    substrate=csub.name),
1720                                ],
1721                            attribute = [
1722                                topdl.Attribute(attribute=n, value=v)
1723                                    for n, v in (\
1724                                        ('domain', 
1725                                            tbparams[master].get('domain',
1726                                                ".example.com")),
1727                                        ('experiment', "%s/%s" % \
1728                                                (tbparams[master].get(
1729                                                    'project', 
1730                                                    'project'), 
1731                                                    eid)),)
1732                                ],
1733                            )
1734                    portal = self.new_dragon_portal(tb, master,
1735                            master, eid, dip, mip, idx, csub, tbparams)
1736                    topo[tb].substrates.append(csub)
1737                    topo[tb].elements.append(portal)
1738                    topo[tb].elements.append(seg)
1739
1740                    mcsub = csub.clone()
1741                    seg = topdl.Segment(
1742                            id= tbparams[tb]['allocID'],
1743                            type='emulab',
1744                            uri = self.tbmap.get(tb, None),
1745                            interface=[ 
1746                                topdl.Interface(
1747                                    substrate=csub.name),
1748                                ],
1749                            attribute = [
1750                                topdl.Attribute(attribute=n, value=v)
1751                                    for n, v in (\
1752                                        ('domain', 
1753                                            tbparams[tb].get('domain',
1754                                                ".example.com")),
1755                                        ('experiment', "%s/%s" % \
1756                                                (tbparams[tb].get('project', 
1757                                                    'project'), 
1758                                                    eid)),)
1759                                ],
1760                            )
1761                    portal = self.new_dragon_portal(master, tb, master,
1762                            eid, mip, dip, idx, mcsub, tbparams)
1763                    topo[master].substrates.append(mcsub)
1764                    topo[master].elements.append(portal)
1765                    topo[master].elements.append(seg)
1766                    for t in (master, tb):
1767                        topo[t].incorporate_elements()
1768
1769                    self.create_dragon_substrate(csub, topo, 
1770                            {tb: 1, master:1}, tbparams, master, eid, connInfo,
1771                            {tb: ip_addr(mip), master: ip_addr(dip)})
1772                else:
1773                    self.add_control_portal(master, tb, master, eid, topo, 
1774                            tbparams, connInfo)
1775                    self.add_control_portal(tb, master, master, eid, topo, 
1776                            tbparams, connInfo)
1777
1778        # Connect the portal nodes into the topologies and clear out
1779        # substrates that are not in the topologies
1780        for tb in tbparams.keys():
1781            topo[tb].incorporate_elements()
1782            topo[tb].substrates = \
1783                    [s for s in topo[tb].substrates \
1784                        if len(s.interfaces) >0]
1785
1786    def wrangle_software(self, expid, top, topo, tbparams):
1787        """
1788        Copy software out to the repository directory, allocate permissions and
1789        rewrite the segment topologies to look for the software in local
1790        places.
1791        """
1792
1793        # Copy the rpms and tarfiles to a distribution directory from
1794        # which the federants can retrieve them
1795        linkpath = "%s/software" %  expid
1796        softdir ="%s/%s" % ( self.repodir, linkpath)
1797        softmap = { }
1798        # These are in a list of tuples format (each kit).  This comprehension
1799        # unwraps them into a single list of tuples that initilaizes the set of
1800        # tuples.
1801        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1802                for p, t in l ])
1803        pkgs.update([x.location for e in top.elements \
1804                for x in e.software])
1805        try:
1806            os.makedirs(softdir)
1807        except IOError, e:
1808            raise service_error(
1809                    "Cannot create software directory: %s" % e)
1810        # The actual copying.  Everything's converted into a url for copying.
1811        for pkg in pkgs:
1812            loc = pkg
1813
1814            scheme, host, path = urlparse(loc)[0:3]
1815            dest = os.path.basename(path)
1816            if not scheme:
1817                if not loc.startswith('/'):
1818                    loc = "/%s" % loc
1819                loc = "file://%s" %loc
1820            try:
1821                u = urlopen(loc)
1822            except Exception, e:
1823                raise service_error(service_error.req, 
1824                        "Cannot open %s: %s" % (loc, e))
1825            try:
1826                f = open("%s/%s" % (softdir, dest) , "w")
1827                self.log.debug("Writing %s/%s" % (softdir,dest) )
1828                data = u.read(4096)
1829                while data:
1830                    f.write(data)
1831                    data = u.read(4096)
1832                f.close()
1833                u.close()
1834            except Exception, e:
1835                raise service_error(service_error.internal,
1836                        "Could not copy %s: %s" % (loc, e))
1837            path = re.sub("/tmp", "", linkpath)
1838            # XXX
1839            softmap[pkg] = \
1840                    "%s/%s/%s" %\
1841                    ( self.repo_url, path, dest)
1842
1843            # Allow the individual segments to access the software.
1844            for tb in tbparams.keys():
1845                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1846                        "/%s/%s" % ( path, dest))
1847
1848        # Convert the software locations in the segments into the local
1849        # copies on this host
1850        for soft in [ s for tb in topo.values() \
1851                for e in tb.elements \
1852                    if getattr(e, 'software', False) \
1853                        for s in e.software ]:
1854            if softmap.has_key(soft.location):
1855                soft.location = softmap[soft.location]
1856
1857
1858    def new_experiment(self, req, fid):
1859        """
1860        The external interface to empty initial experiment creation called from
1861        the dispatcher.
1862
1863        Creates a working directory, splits the incoming description using the
1864        splitter script and parses out the avrious subsections using the
1865        lcasses above.  Once each sub-experiment is created, use pooled threads
1866        to instantiate them and start it all up.
1867        """
1868        if not self.auth.check_attribute(fid, 'new'):
1869            raise service_error(service_error.access, "New access denied")
1870
1871        try:
1872            tmpdir = tempfile.mkdtemp(prefix="split-")
1873        except IOError:
1874            raise service_error(service_error.internal, "Cannot create tmp dir")
1875
1876        try:
1877            access_user = self.accessdb[fid]
1878        except KeyError:
1879            raise service_error(service_error.internal,
1880                    "Access map and authorizer out of sync in " + \
1881                            "new_experiment for fedid %s"  % fid)
1882
1883        pid = "dummy"
1884        gid = "dummy"
1885
1886        req = req.get('NewRequestBody', None)
1887        if not req:
1888            raise service_error(service_error.req,
1889                    "Bad request format (no NewRequestBody)")
1890
1891        # Generate an ID for the experiment (slice) and a certificate that the
1892        # allocator can use to prove they own it.  We'll ship it back through
1893        # the encrypted connection.
1894        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1895
1896        #now we're done with the tmpdir, and it should be empty
1897        if self.cleanup:
1898            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1899            os.rmdir(tmpdir)
1900        else:
1901            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1902
1903        eid = self.create_experiment_state(fid, req, expid, expcert, 
1904                state='empty')
1905
1906        # Let users touch the state
1907        self.auth.set_attribute(fid, expid)
1908        self.auth.set_attribute(expid, expid)
1909        # Override fedids can manipulate state as well
1910        for o in self.overrides:
1911            self.auth.set_attribute(o, expid)
1912
1913        rv = {
1914                'experimentID': [
1915                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1916                ],
1917                'experimentStatus': 'empty',
1918                'experimentAccess': { 'X509' : expcert }
1919            }
1920
1921        return rv
1922
1923
1924    def create_experiment(self, req, fid):
1925        """
1926        The external interface to experiment creation called from the
1927        dispatcher.
1928
1929        Creates a working directory, splits the incoming description using the
1930        splitter script and parses out the avrious subsections using the
1931        lcasses above.  Once each sub-experiment is created, use pooled threads
1932        to instantiate them and start it all up.
1933        """
1934
1935        req = req.get('CreateRequestBody', None)
1936        if not req:
1937            raise service_error(service_error.req,
1938                    "Bad request format (no CreateRequestBody)")
1939
1940        # Get the experiment access
1941        exp = req.get('experimentID', None)
1942        if exp:
1943            if exp.has_key('fedid'):
1944                key = exp['fedid']
1945                expid = key
1946                eid = None
1947            elif exp.has_key('localname'):
1948                key = exp['localname']
1949                eid = key
1950                expid = None
1951            else:
1952                raise service_error(service_error.req, "Unknown lookup type")
1953        else:
1954            raise service_error(service_error.req, "No request?")
1955
1956        self.check_experiment_access(fid, key)
1957
1958        try:
1959            tmpdir = tempfile.mkdtemp(prefix="split-")
1960            os.mkdir(tmpdir+"/keys")
1961        except IOError:
1962            raise service_error(service_error.internal, "Cannot create tmp dir")
1963
1964        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1965        gw_secretkey_base = "fed.%s" % self.ssh_type
1966        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1967        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1968        tclfile = tmpdir + "/experiment.tcl"
1969        tbparams = { }
1970        try:
1971            access_user = self.accessdb[fid]
1972        except KeyError:
1973            raise service_error(service_error.internal,
1974                    "Access map and authorizer out of sync in " + \
1975                            "create_experiment for fedid %s"  % fid)
1976
1977        pid = "dummy"
1978        gid = "dummy"
1979
1980        # The tcl parser needs to read a file so put the content into that file
1981        descr=req.get('experimentdescription', None)
1982        if descr:
1983            file_content=descr.get('ns2description', None)
1984            if file_content:
1985                try:
1986                    f = open(tclfile, 'w')
1987                    f.write(file_content)
1988                    f.close()
1989                except IOError:
1990                    raise service_error(service_error.internal,
1991                            "Cannot write temp experiment description")
1992            else:
1993                raise service_error(service_error.req, 
1994                        "Only ns2descriptions supported")
1995        else:
1996            raise service_error(service_error.req, "No experiment description")
1997
1998        self.state_lock.acquire()
1999        if self.state.has_key(key):
2000            self.state[key]['experimentStatus'] = "starting"
2001            for e in self.state[key].get('experimentID',[]):
2002                if not expid and e.has_key('fedid'):
2003                    expid = e['fedid']
2004                elif not eid and e.has_key('localname'):
2005                    eid = e['localname']
2006        self.state_lock.release()
2007
2008        if not (eid and expid):
2009            raise service_error(service_error.internal, 
2010                    "Cannot find local experiment info!?")
2011
2012        try: 
2013            # This catches exceptions to clear the placeholder if necessary
2014            try:
2015                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2016            except ValueError:
2017                raise service_error(service_error.server_config, 
2018                        "Bad key type (%s)" % self.ssh_type)
2019
2020            master = req.get('master', None)
2021            if not master:
2022                raise service_error(service_error.req,
2023                        "No master testbed label")
2024            export_project = req.get('exportProject', None)
2025            if not export_project:
2026                raise service_error(service_error.req, "No export project")
2027           
2028            # Translate to topdl
2029            if self.splitter_url:
2030                # XXX: need remote topdl translator
2031                self.log.debug("Calling remote splitter at %s" % \
2032                        self.splitter_url)
2033                split_data = self.remote_splitter(self.splitter_url,
2034                        file_content, master)
2035            else:
2036                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2037                    str(self.muxmax), '-m', master]
2038
2039                if self.fedkit:
2040                    tclcmd.append('-k')
2041
2042                if self.gatewaykit:
2043                    tclcmd.append('-K')
2044
2045                tclcmd.extend([pid, gid, eid, tclfile])
2046
2047                self.log.debug("running local splitter %s", " ".join(tclcmd))
2048                # This is just fantastic.  As a side effect the parser copies
2049                # tb_compat.tcl into the current directory, so that directory
2050                # must be writable by the fedd user.  Doing this in the
2051                # temporary subdir ensures this is the case.
2052                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2053                        cwd=tmpdir)
2054                split_data = tclparser.stdout
2055
2056            top = topdl.topology_from_xml(file=split_data, top="experiment")
2057
2058            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2059             # Find the testbeds to look up
2060            testbeds = set([ a.value for e in top.elements \
2061                    for a in e.attribute \
2062                    if a.attribute == 'testbed'] )
2063
2064            allocated = { }         # Testbeds we can access
2065            topo ={ }               # Sub topologies
2066            connInfo = { }          # Connection information
2067            services = [ ]
2068            self.get_access_to_testbeds(testbeds, access_user, 
2069                    export_project, master, allocated, tbparams, services)
2070            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2071
2072            # Copy configuration files into the remote file store
2073            # The config urlpath
2074            configpath = "/%s/config" % expid
2075            # The config file system location
2076            configdir ="%s%s" % ( self.repodir, configpath)
2077            try:
2078                os.makedirs(configdir)
2079            except IOError, e:
2080                raise service_error(
2081                        "Cannot create config directory: %s" % e)
2082            try:
2083                f = open("%s/hosts" % configdir, "w")
2084                f.write('\n'.join(hosts))
2085                f.close()
2086            except IOError, e:
2087                raise service_error(service_error.internal, 
2088                        "Cannot write hosts file: %s" % e)
2089            try:
2090                copy_file("%s" % gw_pubkey, "%s/%s" % \
2091                        (configdir, gw_pubkey_base))
2092                copy_file("%s" % gw_secretkey, "%s/%s" % \
2093                        (configdir, gw_secretkey_base))
2094            except IOError, e:
2095                raise service_error(service_error.internal, 
2096                        "Cannot copy keyfiles: %s" % e)
2097
2098            # Allow the individual testbeds to access the configuration files.
2099            for tb in tbparams.keys():
2100                asignee = tbparams[tb]['allocID']['fedid']
2101                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2102                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2103
2104            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
2105                    connInfo)
2106            # Now get access to the dynamic testbeds
2107            for k, t in topo.items():
2108                if not t.get_attribute('dynamic'):
2109                    continue
2110                tb = t.get_attribute('testbed')
2111                if tb: 
2112                    self.get_access(tb, None, tbparams, master, 
2113                            export_project, access_user, services)
2114                    tbparams[k] = tbparams[tb]
2115                    del tbparams[tb]
2116                    allocated[k] = 1
2117                else:
2118                    raise service_error(service_error.internal, 
2119                            "Dynamic allocation from no testbed!?")
2120
2121            self.wrangle_software(expid, top, topo, tbparams)
2122
2123            vtopo = topdl.topology_to_vtopo(top)
2124            vis = self.genviz(vtopo)
2125
2126            # save federant information
2127            for k in allocated.keys():
2128                tbparams[k]['federant'] = {
2129                        'name': [ { 'localname' : eid} ],
2130                        'allocID' : tbparams[k]['allocID'],
2131                        'master' : k == master,
2132                        'uri': tbparams[k]['uri'],
2133                    }
2134                if tbparams[k].has_key('emulab'):
2135                        tbparams[k]['federant']['emulab'] = \
2136                                tbparams[k]['emulab']
2137
2138            self.state_lock.acquire()
2139            self.state[eid]['vtopo'] = vtopo
2140            self.state[eid]['vis'] = vis
2141            self.state[expid]['federant'] = \
2142                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2143                        if tbparams[tb].has_key('federant') ]
2144            if self.state_filename: 
2145                self.write_state()
2146            self.state_lock.release()
2147        except service_error, e:
2148            # If something goes wrong in the parse (usually an access error)
2149            # clear the placeholder state.  From here on out the code delays
2150            # exceptions.  Failing at this point returns a fault to the remote
2151            # caller.
2152
2153            self.state_lock.acquire()
2154            del self.state[eid]
2155            del self.state[expid]
2156            if self.state_filename: self.write_state()
2157            self.state_lock.release()
2158            raise e
2159
2160
2161        # Start the background swapper and return the starting state.  From
2162        # here on out, the state will stick around a while.
2163
2164        # Let users touch the state
2165        self.auth.set_attribute(fid, expid)
2166        self.auth.set_attribute(expid, expid)
2167        # Override fedids can manipulate state as well
2168        for o in self.overrides:
2169            self.auth.set_attribute(o, expid)
2170
2171        # Create a logger that logs to the experiment's state object as well as
2172        # to the main log file.
2173        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2174        alloc_collector = self.list_log(self.state[eid]['log'])
2175        h = logging.StreamHandler(alloc_collector)
2176        # XXX: there should be a global one of these rather than repeating the
2177        # code.
2178        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2179                    '%d %b %y %H:%M:%S'))
2180        alloc_log.addHandler(h)
2181       
2182        attrs = [ 
2183                {
2184                    'attribute': 'ssh_pubkey', 
2185                    'value': '%s/%s/config/%s' % \
2186                            (self.repo_url, expid, gw_pubkey_base)
2187                },
2188                {
2189                    'attribute': 'ssh_secretkey', 
2190                    'value': '%s/%s/config/%s' % \
2191                            (self.repo_url, expid, gw_secretkey_base)
2192                },
2193                {
2194                    'attribute': 'hosts', 
2195                    'value': '%s/%s/config/hosts' % \
2196                            (self.repo_url, expid)
2197                },
2198                {
2199                    'attribute': 'experiment_name',
2200                    'value': eid,
2201                },
2202            ]
2203
2204        # transit and disconnected testbeds may not have a connInfo entry.
2205        # Fill in the blanks.
2206        for t in allocated.keys():
2207            if not connInfo.has_key(t):
2208                connInfo[t] = { }
2209
2210        # Start a thread to do the resource allocation
2211        t  = Thread(target=self.allocate_resources,
2212                args=(allocated, master, eid, expid, tbparams, 
2213                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2214                    services),
2215                name=eid)
2216        t.start()
2217
2218        rv = {
2219                'experimentID': [
2220                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2221                ],
2222                'experimentStatus': 'starting',
2223            }
2224
2225        return rv
2226   
2227    def get_experiment_fedid(self, key):
2228        """
2229        find the fedid associated with the localname key in the state database.
2230        """
2231
2232        rv = None
2233        self.state_lock.acquire()
2234        if self.state.has_key(key):
2235            if isinstance(self.state[key], dict):
2236                try:
2237                    kl = [ f['fedid'] for f in \
2238                            self.state[key]['experimentID']\
2239                                if f.has_key('fedid') ]
2240                except KeyError:
2241                    self.state_lock.release()
2242                    raise service_error(service_error.internal, 
2243                            "No fedid for experiment %s when getting "+\
2244                                    "fedid(!?)" % key)
2245                if len(kl) == 1:
2246                    rv = kl[0]
2247                else:
2248                    self.state_lock.release()
2249                    raise service_error(service_error.internal, 
2250                            "multiple fedids for experiment %s when " +\
2251                                    "getting fedid(!?)" % key)
2252            else:
2253                self.state_lock.release()
2254                raise service_error(service_error.internal, 
2255                        "Unexpected state for %s" % key)
2256        self.state_lock.release()
2257        return rv
2258
2259    def check_experiment_access(self, fid, key):
2260        """
2261        Confirm that the fid has access to the experiment.  Though a request
2262        may be made in terms of a local name, the access attribute is always
2263        the experiment's fedid.
2264        """
2265        if not isinstance(key, fedid):
2266            key = self.get_experiment_fedid(key)
2267
2268        if self.auth.check_attribute(fid, key):
2269            return True
2270        else:
2271            raise service_error(service_error.access, "Access Denied")
2272
2273
2274    def get_handler(self, path, fid):
2275        self.log.info("Get handler %s %s" % (path, fid))
2276        if self.auth.check_attribute(fid, path):
2277            return ("%s/%s" % (self.repodir, path), "application/binary")
2278        else:
2279            return (None, None)
2280
2281    def get_vtopo(self, req, fid):
2282        """
2283        Return the stored virtual topology for this experiment
2284        """
2285        rv = None
2286        state = None
2287
2288        req = req.get('VtopoRequestBody', None)
2289        if not req:
2290            raise service_error(service_error.req,
2291                    "Bad request format (no VtopoRequestBody)")
2292        exp = req.get('experiment', None)
2293        if exp:
2294            if exp.has_key('fedid'):
2295                key = exp['fedid']
2296                keytype = "fedid"
2297            elif exp.has_key('localname'):
2298                key = exp['localname']
2299                keytype = "localname"
2300            else:
2301                raise service_error(service_error.req, "Unknown lookup type")
2302        else:
2303            raise service_error(service_error.req, "No request?")
2304
2305        self.check_experiment_access(fid, key)
2306
2307        self.state_lock.acquire()
2308        if self.state.has_key(key):
2309            if self.state[key].has_key('vtopo'):
2310                rv = { 'experiment' : {keytype: key },\
2311                        'vtopo': self.state[key]['vtopo'],\
2312                    }
2313            else:
2314                state = self.state[key]['experimentStatus']
2315        self.state_lock.release()
2316
2317        if rv: return rv
2318        else: 
2319            if state:
2320                raise service_error(service_error.partial, 
2321                        "Not ready: %s" % state)
2322            else:
2323                raise service_error(service_error.req, "No such experiment")
2324
2325    def get_vis(self, req, fid):
2326        """
2327        Return the stored visualization for this experiment
2328        """
2329        rv = None
2330        state = None
2331
2332        req = req.get('VisRequestBody', None)
2333        if not req:
2334            raise service_error(service_error.req,
2335                    "Bad request format (no VisRequestBody)")
2336        exp = req.get('experiment', None)
2337        if exp:
2338            if exp.has_key('fedid'):
2339                key = exp['fedid']
2340                keytype = "fedid"
2341            elif exp.has_key('localname'):
2342                key = exp['localname']
2343                keytype = "localname"
2344            else:
2345                raise service_error(service_error.req, "Unknown lookup type")
2346        else:
2347            raise service_error(service_error.req, "No request?")
2348
2349        self.check_experiment_access(fid, key)
2350
2351        self.state_lock.acquire()
2352        if self.state.has_key(key):
2353            if self.state[key].has_key('vis'):
2354                rv =  { 'experiment' : {keytype: key },\
2355                        'vis': self.state[key]['vis'],\
2356                        }
2357            else:
2358                state = self.state[key]['experimentStatus']
2359        self.state_lock.release()
2360
2361        if rv: return rv
2362        else:
2363            if state:
2364                raise service_error(service_error.partial, 
2365                        "Not ready: %s" % state)
2366            else:
2367                raise service_error(service_error.req, "No such experiment")
2368
2369    def clean_info_response(self, rv):
2370        """
2371        Remove the information in the experiment's state object that is not in
2372        the info response.
2373        """
2374        # Remove the owner info (should always be there, but...)
2375        if rv.has_key('owner'): del rv['owner']
2376
2377        # Convert the log into the allocationLog parameter and remove the
2378        # log entry (with defensive programming)
2379        if rv.has_key('log'):
2380            rv['allocationLog'] = "".join(rv['log'])
2381            del rv['log']
2382        else:
2383            rv['allocationLog'] = ""
2384
2385        if rv['experimentStatus'] != 'active':
2386            if rv.has_key('federant'): del rv['federant']
2387        else:
2388            # remove the allocationID and uri info from each federant
2389            for f in rv.get('federant', []):
2390                if f.has_key('allocID'): del f['allocID']
2391                if f.has_key('uri'): del f['uri']
2392        return rv
2393
2394    def get_info(self, req, fid):
2395        """
2396        Return all the stored info about this experiment
2397        """
2398        rv = None
2399
2400        req = req.get('InfoRequestBody', None)
2401        if not req:
2402            raise service_error(service_error.req,
2403                    "Bad request format (no InfoRequestBody)")
2404        exp = req.get('experiment', None)
2405        if exp:
2406            if exp.has_key('fedid'):
2407                key = exp['fedid']
2408                keytype = "fedid"
2409            elif exp.has_key('localname'):
2410                key = exp['localname']
2411                keytype = "localname"
2412            else:
2413                raise service_error(service_error.req, "Unknown lookup type")
2414        else:
2415            raise service_error(service_error.req, "No request?")
2416
2417        self.check_experiment_access(fid, key)
2418
2419        # The state may be massaged by the service function that called
2420        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2421        # state.
2422        self.state_lock.acquire()
2423        if self.state.has_key(key):
2424            rv = copy.deepcopy(self.state[key])
2425        self.state_lock.release()
2426
2427        if rv:
2428            return self.clean_info_response(rv)
2429        else:
2430            raise service_error(service_error.req, "No such experiment")
2431
2432    def get_multi_info(self, req, fid):
2433        """
2434        Return all the stored info that this fedid can access
2435        """
2436        rv = { 'info': [ ] }
2437
2438        self.state_lock.acquire()
2439        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2440            try:
2441                self.check_experiment_access(fid, key)
2442            except service_error, e:
2443                if e.code == service_error.access:
2444                    continue
2445                else:
2446                    self.state_lock.release()
2447                    raise e
2448
2449            if self.state.has_key(key):
2450                e = copy.deepcopy(self.state[key])
2451                e = self.clean_info_response(e)
2452                rv['info'].append(e)
2453        self.state_lock.release()
2454        return rv
2455
2456    def terminate_experiment(self, req, fid):
2457        """
2458        Swap this experiment out on the federants and delete the shared
2459        information
2460        """
2461        tbparams = { }
2462        req = req.get('TerminateRequestBody', None)
2463        if not req:
2464            raise service_error(service_error.req,
2465                    "Bad request format (no TerminateRequestBody)")
2466        force = req.get('force', False)
2467        exp = req.get('experiment', None)
2468        if exp:
2469            if exp.has_key('fedid'):
2470                key = exp['fedid']
2471                keytype = "fedid"
2472            elif exp.has_key('localname'):
2473                key = exp['localname']
2474                keytype = "localname"
2475            else:
2476                raise service_error(service_error.req, "Unknown lookup type")
2477        else:
2478            raise service_error(service_error.req, "No request?")
2479
2480        self.check_experiment_access(fid, key)
2481
2482        dealloc_list = [ ]
2483
2484
2485        # Create a logger that logs to the dealloc_list as well as to the main
2486        # log file.
2487        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2488        h = logging.StreamHandler(self.list_log(dealloc_list))
2489        # XXX: there should be a global one of these rather than repeating the
2490        # code.
2491        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2492                    '%d %b %y %H:%M:%S'))
2493        dealloc_log.addHandler(h)
2494
2495        self.state_lock.acquire()
2496        fed_exp = self.state.get(key, None)
2497
2498        if fed_exp:
2499            # This branch of the conditional holds the lock to generate a
2500            # consistent temporary tbparams variable to deallocate experiments.
2501            # It releases the lock to do the deallocations and reacquires it to
2502            # remove the experiment state when the termination is complete.
2503
2504            # First make sure that the experiment creation is complete.
2505            status = fed_exp.get('experimentStatus', None)
2506
2507            if status:
2508                if status in ('starting', 'terminating'):
2509                    if not force:
2510                        self.state_lock.release()
2511                        raise service_error(service_error.partial, 
2512                                'Experiment still being created or destroyed')
2513                    else:
2514                        self.log.warning('Experiment in %s state ' % status + \
2515                                'being terminated by force.')
2516            else:
2517                # No status??? trouble
2518                self.state_lock.release()
2519                raise service_error(service_error.internal,
2520                        "Experiment has no status!?")
2521
2522            ids = []
2523            #  experimentID is a list of dicts that are self-describing
2524            #  identifiers.  This finds all the fedids and localnames - the
2525            #  keys of self.state - and puts them into ids.
2526            for id in fed_exp.get('experimentID', []):
2527                if id.has_key('fedid'): ids.append(id['fedid'])
2528                if id.has_key('localname'): ids.append(id['localname'])
2529
2530            # Collect the allocation/segment ids into a dict keyed by the fedid
2531            # of the allocation (or a monotonically increasing integer) that
2532            # contains a tuple of uri, aid (which is a dict...)
2533            for i, fed in enumerate(fed_exp.get('federant', [])):
2534                try:
2535                    uri = fed['uri']
2536                    aid = fed['allocID']
2537                    k = fed['allocID'].get('fedid', i)
2538                except KeyError, e:
2539                    continue
2540                tbparams[k] = (uri, aid)
2541            fed_exp['experimentStatus'] = 'terminating'
2542            if self.state_filename: self.write_state()
2543            self.state_lock.release()
2544
2545            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2546            # then completes, so we can't wait if nothing starts.  So, no
2547            # tbparams, no start.
2548            if len(tbparams) > 0:
2549                thread_pool = self.thread_pool(self.nthreads)
2550                for k in tbparams.keys():
2551                    # Create and start a thread to stop the segment
2552                    thread_pool.wait_for_slot()
2553                    uri, aid = tbparams[k]
2554                    t  = self.pooled_thread(\
2555                            target=self.terminate_segment(log=dealloc_log,
2556                                testbed=uri,
2557                                cert_file=self.cert_file, 
2558                                cert_pwd=self.cert_pwd,
2559                                trusted_certs=self.trusted_certs,
2560                                caller=self.call_TerminateSegment),
2561                            args=(uri, aid), name=k,
2562                            pdata=thread_pool, trace_file=self.trace_file)
2563                    t.start()
2564                # Wait for completions
2565                thread_pool.wait_for_all_done()
2566
2567            # release the allocations (failed experiments have done this
2568            # already, and starting experiments may be in odd states, so we
2569            # ignore errors releasing those allocations
2570            try: 
2571                for k in tbparams.keys():
2572                    # This releases access by uri
2573                    uri, aid = tbparams[k]
2574                    self.release_access(None, aid, uri=uri)
2575            except service_error, e:
2576                if status != 'failed' and not force:
2577                    raise e
2578
2579            # Remove the terminated experiment
2580            self.state_lock.acquire()
2581            for id in ids:
2582                if self.state.has_key(id): del self.state[id]
2583
2584            if self.state_filename: self.write_state()
2585            self.state_lock.release()
2586
2587            return { 
2588                    'experiment': exp , 
2589                    'deallocationLog': "".join(dealloc_list),
2590                    }
2591        else:
2592            # Don't forget to release the lock
2593            self.state_lock.release()
2594            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.