source: fedd/federation/experiment_control.py @ 3bddd24

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 3bddd24 was 3bddd24, checked in by Ted Faber <faber@…>, 14 years ago

moving toward credentials, and away from emulab specifics

  • Property mode set to 100644
File size: 88.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221        self.repo_url = config.get("experiment_control", "repo_url", 
222                "https://users.isi.deterlab.net:23235");
223
224        self.exp_stem = "fed-stem"
225        self.log = logging.getLogger("fedd.experiment_control")
226        set_log_level(config, "experiment_control", self.log)
227        self.muxmax = 2
228        self.nthreads = 2
229        self.randomize_experiments = False
230
231        self.splitter = None
232        self.ssh_keygen = "/usr/bin/ssh-keygen"
233        self.ssh_identity_file = None
234
235
236        self.debug = config.getboolean("experiment_control", "create_debug")
237        self.cleanup = not config.getboolean("experiment_control", 
238                "leave_tmpfiles")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'New': soap_handler('New', self.new_experiment),
337                'Create': soap_handler('Create', self.create_experiment),
338                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
339                'Vis': soap_handler('Vis', self.get_vis),
340                'Info': soap_handler('Info', self.get_info),
341                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
342                'Terminate': soap_handler('Terminate', 
343                    self.terminate_experiment),
344        }
345
346        self.xmlrpc_services = {\
347                'New': xmlrpc_handler('New', self.new_experiment),
348                'Create': xmlrpc_handler('Create', self.create_experiment),
349                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
350                'Vis': xmlrpc_handler('Vis', self.get_vis),
351                'Info': xmlrpc_handler('Info', self.get_info),
352                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
353                'Terminate': xmlrpc_handler('Terminate',
354                    self.terminate_experiment),
355        }
356
357    # Call while holding self.state_lock
358    def write_state(self):
359        """
360        Write a new copy of experiment state after copying the existing state
361        to a backup.
362
363        State format is a simple pickling of the state dictionary.
364        """
365        if os.access(self.state_filename, os.W_OK):
366            copy_file(self.state_filename, \
367                    "%s.bak" % self.state_filename)
368        try:
369            f = open(self.state_filename, 'w')
370            pickle.dump(self.state, f)
371        except IOError, e:
372            self.log.error("Can't write file %s: %s" % \
373                    (self.state_filename, e))
374        except pickle.PicklingError, e:
375            self.log.error("Pickling problem: %s" % e)
376        except TypeError, e:
377            self.log.error("Pickling problem (TypeError): %s" % e)
378
379    # Call while holding self.state_lock
380    def read_state(self):
381        """
382        Read a new copy of experiment state.  Old state is overwritten.
383
384        State format is a simple pickling of the state dictionary.
385        """
386       
387        def get_experiment_id(state):
388            """
389            Pull the fedid experimentID out of the saved state.  This is kind
390            of a gross walk through the dict.
391            """
392
393            if state.has_key('experimentID'):
394                for e in state['experimentID']:
395                    if e.has_key('fedid'):
396                        return e['fedid']
397                else:
398                    return None
399            else:
400                return None
401
402        def get_alloc_ids(state):
403            """
404            Pull the fedids of the identifiers of each allocation from the
405            state.  Again, a dict dive that's best isolated.
406            """
407
408            return [ f['allocID']['fedid'] 
409                    for f in state.get('federant',[]) \
410                        if f.has_key('allocID') and \
411                            f['allocID'].has_key('fedid')]
412
413
414        try:
415            f = open(self.state_filename, "r")
416            self.state = pickle.load(f)
417            self.log.debug("[read_state]: Read state from %s" % \
418                    self.state_filename)
419        except IOError, e:
420            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
421                    % (self.state_filename, e))
422        except pickle.UnpicklingError, e:
423            self.log.warning(("[read_state]: No saved state: " + \
424                    "Unpickling failed: %s") % e)
425       
426        for s in self.state.values():
427            try:
428
429                eid = get_experiment_id(s)
430                if eid : 
431                    # Give the owner rights to the experiment
432                    self.auth.set_attribute(s['owner'], eid)
433                    # And holders of the eid as well
434                    self.auth.set_attribute(eid, eid)
435                    # allow overrides to control experiments as well
436                    for o in self.overrides:
437                        self.auth.set_attribute(o, eid)
438                    # Set permissions to allow reading of the software repo, if
439                    # any, as well.
440                    for a in get_alloc_ids(s):
441                        self.auth.set_attribute(a, 'repo/%s' % eid)
442                else:
443                    raise KeyError("No experiment id")
444            except KeyError, e:
445                self.log.warning("[read_state]: State ownership or identity " +\
446                        "misformatted in %s: %s" % (self.state_filename, e))
447
448
449    def read_accessdb(self, accessdb_file):
450        """
451        Read the mapping from fedids that can create experiments to their name
452        in the 3-level access namespace.  All will be asserted from this
453        testbed and can include the local username and porject that will be
454        asserted on their behalf by this fedd.  Each fedid is also added to the
455        authorization system with the "create" attribute.
456        """
457        self.accessdb = {}
458        # These are the regexps for parsing the db
459        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
460        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
461                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
462        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
463                "\s*->\s*(" + name_expr + ")\s*$")
464        lineno = 0
465
466        # Parse the mappings and store in self.authdb, a dict of
467        # fedid -> (proj, user)
468        try:
469            f = open(accessdb_file, "r")
470            for line in f:
471                lineno += 1
472                line = line.strip()
473                if len(line) == 0 or line.startswith('#'):
474                    continue
475                m = project_line.match(line)
476                if m:
477                    fid = fedid(hexstr=m.group(1))
478                    project, user = m.group(2,3)
479                    if not self.accessdb.has_key(fid):
480                        self.accessdb[fid] = []
481                    self.accessdb[fid].append((project, user))
482                    continue
483
484                m = user_line.match(line)
485                if m:
486                    fid = fedid(hexstr=m.group(1))
487                    project = None
488                    user = m.group(2)
489                    if not self.accessdb.has_key(fid):
490                        self.accessdb[fid] = []
491                    self.accessdb[fid].append((project, user))
492                    continue
493                self.log.warn("[experiment_control] Error parsing access " +\
494                        "db %s at line %d" %  (accessdb_file, lineno))
495        except IOError:
496            raise service_error(service_error.internal,
497                    "Error opening/reading %s as experiment " +\
498                            "control accessdb" %  accessdb_file)
499        f.close()
500
501        # Initialize the authorization attributes
502        for fid in self.accessdb.keys():
503            self.auth.set_attribute(fid, 'create')
504            self.auth.set_attribute(fid, 'new')
505
506    def read_mapdb(self, file):
507        """
508        Read a simple colon separated list of mappings for the
509        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
510        """
511
512        self.tbmap = { }
513        lineno =0
514        try:
515            f = open(file, "r")
516            for line in f:
517                lineno += 1
518                line = line.strip()
519                if line.startswith('#') or len(line) == 0:
520                    continue
521                try:
522                    label, url = line.split(':', 1)
523                    self.tbmap[label] = url
524                except ValueError, e:
525                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
526                            "map db: %s %s" % (lineno, line, e))
527        except IOError, e:
528            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
529                    "open %s: %s" % (file, e))
530        f.close()
531       
532    def generate_ssh_keys(self, dest, type="rsa" ):
533        """
534        Generate a set of keys for the gateways to use to talk.
535
536        Keys are of type type and are stored in the required dest file.
537        """
538        valid_types = ("rsa", "dsa")
539        t = type.lower();
540        if t not in valid_types: raise ValueError
541        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
542
543        try:
544            trace = open("/dev/null", "w")
545        except IOError:
546            raise service_error(service_error.internal,
547                    "Cannot open /dev/null??");
548
549        # May raise CalledProcessError
550        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
551        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
552        if rv != 0:
553            raise service_error(service_error.internal, 
554                    "Cannot generate nonce ssh keys.  %s return code %d" \
555                            % (self.ssh_keygen, rv))
556
557    def gentopo(self, str):
558        """
559        Generate the topology dtat structure from the splitter's XML
560        representation of it.
561
562        The topology XML looks like:
563            <experiment>
564                <nodes>
565                    <node><vname></vname><ips>ip1:ip2</ips></node>
566                </nodes>
567                <lans>
568                    <lan>
569                        <vname></vname><vnode></vnode><ip></ip>
570                        <bandwidth></bandwidth><member>node:port</member>
571                    </lan>
572                </lans>
573        """
574        class topo_parse:
575            """
576            Parse the topology XML and create the dats structure.
577            """
578            def __init__(self):
579                # Typing of the subelements for data conversion
580                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
581                self.int_subelements = ( 'bandwidth',)
582                self.float_subelements = ( 'delay',)
583                # The final data structure
584                self.nodes = [ ]
585                self.lans =  [ ]
586                self.topo = { \
587                        'node': self.nodes,\
588                        'lan' : self.lans,\
589                    }
590                self.element = { }  # Current element being created
591                self.chars = ""     # Last text seen
592
593            def end_element(self, name):
594                # After each sub element the contents is added to the current
595                # element or to the appropriate list.
596                if name == 'node':
597                    self.nodes.append(self.element)
598                    self.element = { }
599                elif name == 'lan':
600                    self.lans.append(self.element)
601                    self.element = { }
602                elif name in self.str_subelements:
603                    self.element[name] = self.chars
604                    self.chars = ""
605                elif name in self.int_subelements:
606                    self.element[name] = int(self.chars)
607                    self.chars = ""
608                elif name in self.float_subelements:
609                    self.element[name] = float(self.chars)
610                    self.chars = ""
611
612            def found_chars(self, data):
613                self.chars += data.rstrip()
614
615
616        tp = topo_parse();
617        parser = xml.parsers.expat.ParserCreate()
618        parser.EndElementHandler = tp.end_element
619        parser.CharacterDataHandler = tp.found_chars
620
621        parser.Parse(str)
622
623        return tp.topo
624       
625
626    def genviz(self, topo):
627        """
628        Generate the visualization the virtual topology
629        """
630
631        neato = "/usr/local/bin/neato"
632        # These are used to parse neato output and to create the visualization
633        # file.
634        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
635        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
636                "%s</type></node>"
637
638        try:
639            # Node names
640            nodes = [ n['vname'] for n in topo['node'] ]
641            topo_lans = topo['lan']
642        except KeyError, e:
643            raise service_error(service_error.internal, "Bad topology: %s" %e)
644
645        lans = { }
646        links = { }
647
648        # Walk through the virtual topology, organizing the connections into
649        # 2-node connections (links) and more-than-2-node connections (lans).
650        # When a lan is created, it's added to the list of nodes (there's a
651        # node in the visualization for the lan).
652        for l in topo_lans:
653            if links.has_key(l['vname']):
654                if len(links[l['vname']]) < 2:
655                    links[l['vname']].append(l['vnode'])
656                else:
657                    nodes.append(l['vname'])
658                    lans[l['vname']] = links[l['vname']]
659                    del links[l['vname']]
660                    lans[l['vname']].append(l['vnode'])
661            elif lans.has_key(l['vname']):
662                lans[l['vname']].append(l['vnode'])
663            else:
664                links[l['vname']] = [ l['vnode'] ]
665
666
667        # Open up a temporary file for dot to turn into a visualization
668        try:
669            df, dotname = tempfile.mkstemp()
670            dotfile = os.fdopen(df, 'w')
671        except IOError:
672            raise service_error(service_error.internal,
673                    "Failed to open file in genviz")
674
675        try:
676            dnull = open('/dev/null', 'w')
677        except IOError:
678            service_error(service_error.internal,
679                    "Failed to open /dev/null in genviz")
680
681        # Generate a dot/neato input file from the links, nodes and lans
682        try:
683            print >>dotfile, "graph G {"
684            for n in nodes:
685                print >>dotfile, '\t"%s"' % n
686            for l in links.keys():
687                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
688            for l in lans.keys():
689                for n in lans[l]:
690                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
691            print >>dotfile, "}"
692            dotfile.close()
693        except TypeError:
694            raise service_error(service_error.internal,
695                    "Single endpoint link in vtopo")
696        except IOError:
697            raise service_error(service_error.internal, "Cannot write dot file")
698
699        # Use dot to create a visualization
700        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
701                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
702                close_fds=True)
703        dnull.close()
704
705        # Translate dot to vis format
706        vis_nodes = [ ]
707        vis = { 'node': vis_nodes }
708        for line in dot.stdout:
709            m = vis_re.match(line)
710            if m:
711                vn = m.group(1)
712                vis_node = {'name': vn, \
713                        'x': float(m.group(2)),\
714                        'y' : float(m.group(3)),\
715                    }
716                if vn in links.keys() or vn in lans.keys():
717                    vis_node['type'] = 'lan'
718                else:
719                    vis_node['type'] = 'node'
720                vis_nodes.append(vis_node)
721        rv = dot.wait()
722
723        os.remove(dotname)
724        if rv == 0 : return vis
725        else: return None
726
727    def get_access(self, tb, nodes, user, tbparam, master, export_project,
728            access_user):
729        """
730        Get access to testbed through fedd and set the parameters for that tb
731        """
732        uri = self.tbmap.get(tb, None)
733        if not uri:
734            raise service_error(serice_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        # currently this lumps all users into one service access group
738        service_keys = [ a for u in user \
739                for a in u.get('access', []) \
740                    if a.has_key('sshPubkey')]
741
742        if len(service_keys) == 0:
743            raise service_error(service_error.req, 
744                    "Must have at least one SSH pubkey for services")
745
746        # Tweak search order so that if there are entries in access_user that
747        # have a project matching the export project, we try them first
748        if export_project and export_project.has_key('localname'): 
749            pn = export_project['localname'] 
750               
751            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
752            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
753        else: 
754            access_sequence = access_user
755
756        for p, u in access_sequence: 
757            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
758                    "to %s") %  ((p or "None"), u, uri))
759
760            if p:
761                # Request with user and project specified
762                req = {\
763                        'destinationTestbed' : { 'uri' : uri },
764                        'credential': [ "project: %s" % p, "user: %s"  % u],
765                        'allocID' : { 'localname': 'test' },
766                    }
767            else:
768                # Request with only user specified
769                req = {\
770                        'destinationTestbed' : { 'uri' : uri },
771                        'credential': [ 'user: %s' % u ],
772                        'user':  [ {'userID': { 'localname': u } } ],
773                        'allocID' : { 'localname': 'test' },
774                    }
775
776            if tb == master:
777                # NB, the export_project parameter is a dict that includes
778                # the type
779                req['exportProject'] = export_project
780
781            # node resources if any
782            if nodes != None and len(nodes) > 0:
783                rnodes = [ ]
784                for n in nodes:
785                    rn = { }
786                    image, hw, count = n.split(":")
787                    if image: rn['image'] = [ image ]
788                    if hw: rn['hardware'] = [ hw ]
789                    if count and int(count) >0 : rn['count'] = int(count)
790                    rnodes.append(rn)
791                req['resources']= { }
792                req['resources']['node'] = rnodes
793
794            try:
795                if self.local_access.has_key(uri):
796                    # Local access call
797                    req = { 'RequestAccessRequestBody' : req }
798                    r = self.local_access[uri].RequestAccess(req, 
799                            fedid(file=self.cert_file))
800                    r = { 'RequestAccessResponseBody' : r }
801                else:
802                    r = self.call_RequestAccess(uri, req, 
803                            self.cert_file, self.cert_pwd, self.trusted_certs)
804            except service_error, e:
805                if e.code == service_error.access:
806                    self.log.debug("[get_access] Access denied")
807                    r = None
808                    continue
809                else:
810                    raise e
811
812            if r.has_key('RequestAccessResponseBody'):
813                # Through to here we have a valid response, not a fault.
814                # Access denied is a fault, so something better or worse than
815                # access denied has happened.
816                r = r['RequestAccessResponseBody']
817                self.log.debug("[get_access] Access granted")
818                break
819            else:
820                raise service_error(service_error.protocol,
821                        "Bad proxy response")
822       
823        if not r:
824            raise service_error(service_error.access, 
825                    "Access denied by %s (%s)" % (tb, uri))
826
827        if r.has_key('emulab'):
828            e = r['emulab']
829            p = e['project']
830            tbparam[tb] = { 
831                    "boss": e['boss'],
832                    "host": e['ops'],
833                    "domain": e['domain'],
834                    "fs": e['fileServer'],
835                    "eventserver": e['eventServer'],
836                    "project": unpack_id(p['name']),
837                    "emulab" : e,
838                    "allocID" : r['allocID'],
839                    "uri": uri,
840                    }
841            # Make the testbed name be the label the user applied
842            p['testbed'] = {'localname': tb }
843
844            for u in p['user']:
845                role = u.get('role', None)
846                if role == 'experimentCreation':
847                    tbparam[tb]['user'] = unpack_id(u['userID'])
848                    break
849            else:
850                raise service_error(service_error.internal, 
851                        "No createExperimentUser from %s" %tb)
852            # Add attributes to parameter space.  We don't allow attributes to
853            # overlay any parameters already installed.
854            for a in e['fedAttr']:
855                try:
856                    if a['attribute'] and \
857                            isinstance(a['attribute'], basestring)\
858                            and not tbparam[tb].has_key(a['attribute'].lower()):
859                        tbparam[tb][a['attribute'].lower()] = a['value']
860                except KeyError:
861                    self.log.error("Bad attribute in response: %s" % a)
862        else:
863            tbparam[tb] = { 
864                "allocID" : r['allocID'],
865                "uri": uri,
866            }
867
868    def release_access(self, tb, aid, uri=None):
869        """
870        Release access to testbed through fedd
871        """
872
873        if not uri:
874            uri = self.tbmap.get(tb, None)
875        if not uri:
876            raise service_error(service_error.server_config, 
877                    "Unknown testbed: %s" % tb)
878
879        if self.local_access.has_key(uri):
880            resp = self.local_access[uri].ReleaseAccess(\
881                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
882                    fedid(file=self.cert_file))
883            resp = { 'ReleaseAccessResponseBody': resp } 
884        else:
885            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
886                    self.cert_file, self.cert_pwd, self.trusted_certs)
887
888        # better error coding
889
890    def remote_splitter(self, uri, desc, master):
891
892        req = {
893                'description' : { 'ns2description': desc },
894                'master': master,
895                'include_fedkit': bool(self.fedkit),
896                'include_gatewaykit': bool(self.gatewaykit)
897            }
898
899        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
900                self.trusted_certs)
901
902        if r.has_key('Ns2SplitResponseBody'):
903            r = r['Ns2SplitResponseBody']
904            if r.has_key('output'):
905                return r['output'].splitlines()
906            else:
907                raise service_error(service_error.protocol, 
908                        "Bad splitter response (no output)")
909        else:
910            raise service_error(service_error.protocol, "Bad splitter response")
911
912    class start_segment:
913        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
914                cert_pwd=None, trusted_certs=None, caller=None,
915                log_collector=None):
916            self.log = log
917            self.debug = debug
918            self.cert_file = cert_file
919            self.cert_pwd = cert_pwd
920            self.trusted_certs = None
921            self.caller = caller
922            self.testbed = testbed
923            self.log_collector = log_collector
924            self.response = None
925
926        def __call__(self, uri, aid, topo, master, attrs=None):
927            req = {
928                    'allocID': { 'fedid' : aid }, 
929                    'segmentdescription': { 
930                        'topdldescription': topo.to_dict(),
931                    },
932                    'master': master,
933                }
934            if attrs:
935                req['fedAttr'] = attrs
936
937            try:
938                self.log.debug("Calling StartSegment at %s " % uri)
939                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
940                        self.trusted_certs)
941                if r.has_key('StartSegmentResponseBody'):
942                    lval = r['StartSegmentResponseBody'].get('allocationLog',
943                            None)
944                    if lval and self.log_collector:
945                        for line in  lval.splitlines(True):
946                            self.log_collector.write(line)
947                    self.response = r
948                else:
949                    raise service_error(service_error.internal, 
950                            "Bad response!?: %s" %r)
951                return True
952            except service_error, e:
953                self.log.error("Start segment failed on %s: %s" % \
954                        (self.testbed, e))
955                return False
956
957
958
959    class terminate_segment:
960        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
961                cert_pwd=None, trusted_certs=None, caller=None):
962            self.log = log
963            self.debug = debug
964            self.cert_file = cert_file
965            self.cert_pwd = cert_pwd
966            self.trusted_certs = None
967            self.caller = caller
968            self.testbed = testbed
969
970        def __call__(self, uri, aid ):
971            req = {
972                    'allocID': aid , 
973                }
974            try:
975                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
976                        self.trusted_certs)
977                return True
978            except service_error, e:
979                self.log.error("Terminate segment failed on %s: %s" % \
980                        (self.testbed, e))
981                return False
982   
983
984    def allocate_resources(self, allocated, master, eid, expid, 
985            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
986            attrs=None):
987        def get_vlan(r):
988            if r.has_key('StartSegmentResponseBody'):
989                srb = r['StartSegmentResponseBody']
990                if srb.has_key('fedAttr'):
991                    for k, v in [ (a['attribute'], a['value']) \
992                            for a in srb['fedAttr']]:
993                        if k == 'vlan': return v
994            return None
995
996        started = { }           # Testbeds where a sub-experiment started
997                                # successfully
998
999        # XXX
1000        fail_soft = False
1001
1002        slaves = [ k for k in allocated.keys() \
1003                if k != master and not topo[k].get_attribute('transit')]
1004        transit = [ k for k in allocated.keys() \
1005                if topo[k].get_attribute('transit')]
1006
1007        log = alloc_log or self.log
1008
1009        thread_pool = self.thread_pool(self.nthreads)
1010        threads = [ ]
1011
1012        for tb in transit:
1013            uri = tbparams[tb]['uri']
1014            if tbparams[tb].has_key('allocID') and \
1015                    tbparams[tb]['allocID'].has_key('fedid'):
1016                aid = tbparams[tb]['allocID']['fedid']
1017            else:
1018                raise service_error(service_error.internal, 
1019                        "No alloc id for testbed %s !?" % tb)
1020
1021            m = re.search('(\d+)', tb)
1022            if m:
1023                to_repl = "unassigned%s" % m.group(1)
1024            else:
1025                raise service_error(service_error.internal, 
1026                        "Bad dynamic allocation name")
1027                break
1028
1029            ss = self.start_segment(log=log, debug=self.debug, 
1030                testbed=tb, cert_file=self.cert_file, 
1031                cert_pwd=self.cert_pwd, 
1032                trusted_certs=self.trusted_certs,
1033                caller=self.call_StartSegment,
1034                log_collector=log_collector)
1035            t = self.pooled_thread(
1036                    target=ss,
1037                    args =(uri, aid, topo[tb], False, attrs),
1038                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1039            threads.append(t)
1040            t.start()
1041            # Wait until the this transit node finishes (keep pinging the log,
1042            # though)
1043
1044            mins = 0
1045            while not thread_pool.wait_for_all_done(60.0):
1046                mins += 1
1047                alloc_log.info("Waiting for transit (it has been %d mins)" \
1048                        % mins)
1049
1050            if t.rv:
1051                vlan = get_vlan(ss.response)
1052                if vlan is not None:
1053                    for k, t in topo.items():
1054                        for e in t.elements:
1055                            for i in e.interface:
1056                                vl = i.get_attribute('dragon_vlan')
1057                                if vl is not None and vl == to_repl:
1058                                    i.set_attribute('dragon_vlan', vlan)
1059            else:
1060                break
1061            thread_pool.clear()
1062
1063
1064        failed = [ t.getName() for t in threads if not t.rv ]
1065
1066        if len(failed) == 0:
1067            for tb in slaves:
1068                # Create and start a thread to start the segment, and save it
1069                # to get the return value later
1070                thread_pool.wait_for_slot()
1071                uri = self.tbmap.get(tb, None)
1072                if not uri:
1073                    raise service_error(service_error.internal, 
1074                            "Unknown testbed %s !?" % tb)
1075
1076                if tbparams[tb].has_key('allocID') and \
1077                        tbparams[tb]['allocID'].has_key('fedid'):
1078                    aid = tbparams[tb]['allocID']['fedid']
1079                else:
1080                    raise service_error(service_error.internal, 
1081                            "No alloc id for testbed %s !?" % tb)
1082
1083                t  = self.pooled_thread(\
1084                        target=self.start_segment(log=log, debug=self.debug,
1085                            testbed=tb, cert_file=self.cert_file,
1086                            cert_pwd=self.cert_pwd,
1087                            trusted_certs=self.trusted_certs,
1088                            caller=self.call_StartSegment,
1089                            log_collector=log_collector), 
1090                        args=(uri, aid, topo[tb], False, attrs), name=tb,
1091                        pdata=thread_pool, trace_file=self.trace_file)
1092                threads.append(t)
1093                t.start()
1094
1095            # Wait until all finish (keep pinging the log, though)
1096            mins = 0
1097            while not thread_pool.wait_for_all_done(60.0):
1098                mins += 1
1099                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1100                        % mins)
1101
1102            thread_pool.clear()
1103
1104        # If none failed, start the master
1105        failed = [ t.getName() for t in threads if not t.rv ]
1106
1107        if len(failed) == 0:
1108            uri = self.tbmap.get(master, None)
1109            if not uri:
1110                raise service_error(service_error.internal, 
1111                        "Unknown testbed %s !?" % master)
1112
1113            if tbparams[master].has_key('allocID') and \
1114                    tbparams[master]['allocID'].has_key('fedid'):
1115                aid = tbparams[master]['allocID']['fedid']
1116            else:
1117                raise service_error(service_error.internal, 
1118                    "No alloc id for testbed %s !?" % master)
1119            t = self.pooled_thread(
1120                    target=self.start_segment(log=log, debug=self.debug, 
1121                        testbed=master, cert_file=self.cert_file, 
1122                        cert_pwd=self.cert_pwd, 
1123                        trusted_certs=self.trusted_certs,
1124                        caller=self.call_StartSegment,
1125                        log_collector=log_collector),
1126                    args =(uri, aid, topo[master], True, attrs),
1127                    name=master, pdata=thread_pool, trace_file=self.trace_file)
1128            threads.append(t)
1129            t.start()
1130            # Wait until the master finishes (keep pinging the log, though)
1131            mins = 0
1132            while not thread_pool.wait_for_all_done(60.0):
1133                mins += 1
1134                alloc_log.info("Waiting for master (it has been %d mins)" \
1135                        % mins)
1136            # update failed to include the master, if it failed
1137            failed = [ t.getName() for t in threads if not t.rv ]
1138
1139        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1140        # If one failed clean up, unless fail_soft is set
1141        if failed:
1142            if not fail_soft:
1143                thread_pool.clear()
1144                for tb in succeeded:
1145                    # Create and start a thread to stop the segment
1146                    thread_pool.wait_for_slot()
1147                    uri = tbparams[tb]['uri']
1148                    t  = self.pooled_thread(\
1149                            target=self.terminate_segment(log=log,
1150                                testbed=tb,
1151                                cert_file=self.cert_file, 
1152                                cert_pwd=self.cert_pwd,
1153                                trusted_certs=self.trusted_certs,
1154                                caller=self.call_TerminateSegment),
1155                            args=(uri, tbparams[tb]['federant']['allocID']),
1156                            name=tb,
1157                            pdata=thread_pool, trace_file=self.trace_file)
1158                    t.start()
1159                # Wait until all finish
1160                thread_pool.wait_for_all_done()
1161
1162                # release the allocations
1163                for tb in tbparams.keys():
1164                    self.release_access(tb, tbparams[tb]['allocID'],
1165                            tbparams[tb].get('uri', None))
1166                # Remove the placeholder
1167                self.state_lock.acquire()
1168                self.state[eid]['experimentStatus'] = 'failed'
1169                if self.state_filename: self.write_state()
1170                self.state_lock.release()
1171
1172                log.error("Swap in failed on %s" % ",".join(failed))
1173                return
1174        else:
1175            log.info("[start_segment]: Experiment %s active" % eid)
1176
1177
1178        # Walk up tmpdir, deleting as we go
1179        if self.cleanup:
1180            log.debug("[start_experiment]: removing %s" % tmpdir)
1181            for path, dirs, files in os.walk(tmpdir, topdown=False):
1182                for f in files:
1183                    os.remove(os.path.join(path, f))
1184                for d in dirs:
1185                    os.rmdir(os.path.join(path, d))
1186            os.rmdir(tmpdir)
1187        else:
1188            log.debug("[start_experiment]: not removing %s" % tmpdir)
1189
1190        # Insert the experiment into our state and update the disk copy
1191        self.state_lock.acquire()
1192        self.state[expid]['experimentStatus'] = 'active'
1193        self.state[eid] = self.state[expid]
1194        if self.state_filename: self.write_state()
1195        self.state_lock.release()
1196        return
1197
1198
1199    def add_kit(self, e, kit):
1200        """
1201        Add a Software object created from the list of (install, location)
1202        tuples passed as kit  to the software attribute of an object e.  We
1203        do this enough to break out the code, but it's kind of a hack to
1204        avoid changing the old tuple rep.
1205        """
1206
1207        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1208
1209        if isinstance(e.software, list): e.software.extend(s)
1210        else: e.software = s
1211
1212
1213    def create_experiment_state(self, fid, req, expid, expcert, 
1214            state='starting'):
1215        """
1216        Create the initial entry in the experiment's state.  The expid and
1217        expcert are the experiment's fedid and certifacte that represents that
1218        ID, which are installed in the experiment state.  If the request
1219        includes a suggested local name that is used if possible.  If the local
1220        name is already taken by an experiment owned by this user that has
1221        failed, it is overwritten.  Otherwise new letters are added until a
1222        valid localname is found.  The generated local name is returned.
1223        """
1224
1225        if req.has_key('experimentID') and \
1226                req['experimentID'].has_key('localname'):
1227            overwrite = False
1228            eid = req['experimentID']['localname']
1229            # If there's an old failed experiment here with the same local name
1230            # and accessible by this user, we'll overwrite it, otherwise we'll
1231            # fall through and do the collision avoidance.
1232            old_expid = self.get_experiment_fedid(eid)
1233            if old_expid and self.check_experiment_access(fid, old_expid):
1234                self.state_lock.acquire()
1235                status = self.state[eid].get('experimentStatus', None)
1236                if status and status == 'failed':
1237                    # remove the old access attribute
1238                    self.auth.unset_attribute(fid, old_expid)
1239                    overwrite = True
1240                    del self.state[eid]
1241                    del self.state[old_expid]
1242                self.state_lock.release()
1243            self.state_lock.acquire()
1244            while (self.state.has_key(eid) and not overwrite):
1245                eid += random.choice(string.ascii_letters)
1246            # Initial state
1247            self.state[eid] = {
1248                    'experimentID' : \
1249                            [ { 'localname' : eid }, {'fedid': expid } ],
1250                    'experimentStatus': state,
1251                    'experimentAccess': { 'X509' : expcert },
1252                    'owner': fid,
1253                    'log' : [],
1254                }
1255            self.state[expid] = self.state[eid]
1256            if self.state_filename: self.write_state()
1257            self.state_lock.release()
1258        else:
1259            eid = self.exp_stem
1260            for i in range(0,5):
1261                eid += random.choice(string.ascii_letters)
1262            self.state_lock.acquire()
1263            while (self.state.has_key(eid)):
1264                eid = self.exp_stem
1265                for i in range(0,5):
1266                    eid += random.choice(string.ascii_letters)
1267            # Initial state
1268            self.state[eid] = {
1269                    'experimentID' : \
1270                            [ { 'localname' : eid }, {'fedid': expid } ],
1271                    'experimentStatus': state,
1272                    'experimentAccess': { 'X509' : expcert },
1273                    'owner': fid,
1274                    'log' : [],
1275                }
1276            self.state[expid] = self.state[eid]
1277            if self.state_filename: self.write_state()
1278            self.state_lock.release()
1279
1280        return eid
1281
1282
1283    def allocate_ips_to_topo(self, top):
1284        """
1285        Add an ip4_address attribute to all the hosts in the topology, based on
1286        the shared substrates on which they sit.  An /etc/hosts file is also
1287        created and returned as a list of hostfiles entries.  We also return
1288        the allocator, because we may need to allocate IPs to portals
1289        (specifically DRAGON portals).
1290        """
1291        subs = sorted(top.substrates, 
1292                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1293                reverse=True)
1294        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1295        ifs = { }
1296        hosts = [ ]
1297
1298        for idx, s in enumerate(subs):
1299            a = ips.allocate(len(s.interfaces)+2)
1300            if a :
1301                base, num = a
1302                if num < len(s.interfaces) +2 : 
1303                    raise service_error(service_error.internal,
1304                            "Allocator returned wrong number of IPs??")
1305            else:
1306                raise service_error(service_error.req, 
1307                        "Cannot allocate IP addresses")
1308
1309            base += 1
1310            for i in s.interfaces:
1311                i.attribute.append(
1312                        topdl.Attribute('ip4_address', 
1313                            "%s" % ip_addr(base)))
1314                hname = i.element.name[0]
1315                if ifs.has_key(hname):
1316                    hosts.append("%s\t%s-%s %s-%d" % \
1317                            (ip_addr(base), hname, s.name, hname,
1318                                ifs[hname]))
1319                else:
1320                    ifs[hname] = 0
1321                    hosts.append("%s\t%s-%s %s-%d %s" % \
1322                            (ip_addr(base), hname, s.name, hname,
1323                                ifs[hname], hname))
1324
1325                ifs[hname] += 1
1326                base += 1
1327        return hosts, ips
1328
1329    def get_access_to_testbeds(self, testbeds, user, access_user, 
1330            export_project, master, allocated, tbparams):
1331        """
1332        Request access to the various testbeds required for this instantiation
1333        (passed in as testbeds).  User, access_user, expoert_project and master
1334        are used to construct the correct requests.  Per-testbed parameters are
1335        returned in tbparams.
1336        """
1337        for tb in testbeds:
1338            self.get_access(tb, None, user, tbparams, master,
1339                    export_project, access_user)
1340            allocated[tb] = 1
1341
1342    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1343        """
1344        Create the sub-topologies that are needed for experimetn instantiation.
1345        Along the way attach startup commands to the computers in the
1346        subtopologies.
1347        """
1348        for tb in testbeds:
1349            topo[tb] = top.clone()
1350            to_delete = [ ]
1351            for e in topo[tb].elements:
1352                etb = e.get_attribute('testbed')
1353                if etb and etb != tb:
1354                    for i in e.interface:
1355                        for s in i.subs:
1356                            try:
1357                                s.interfaces.remove(i)
1358                            except ValueError:
1359                                raise service_error(service_error.internal,
1360                                        "Can't remove interface??")
1361                    to_delete.append(e)
1362            for e in to_delete:
1363                topo[tb].elements.remove(e)
1364            topo[tb].make_indices()
1365
1366            for e in [ e for e in topo[tb].elements \
1367                    if isinstance(e,topdl.Computer)]:
1368                if tb == master:
1369                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1370                else:
1371                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1372                scmd = e.get_attribute('startup')
1373                if scmd:
1374                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1375
1376                e.set_attribute('startup', cmd)
1377                if self.fedkit: self.add_kit(e, self.fedkit)
1378
1379    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1380            portal_type, iface_desc=()):
1381        sproject = tbparams[st].get('project', 'project')
1382        dproject = tbparams[dt].get('project', 'project')
1383        mproject = tbparams[master].get('project', 'project')
1384        sdomain = tbparams[st].get('domain', ".example.com")
1385        ddomain = tbparams[dt].get('domain', ".example.com")
1386        mdomain = tbparams[master].get('domain', '.example.com')
1387        muser = tbparams[master].get('user', 'root')
1388        smbshare = tbparams[master].get('smbshare', 'USERS')
1389        aid = tbparams[dt]['allocID']['fedid']
1390        if st == master or dt == master:
1391            active = ("%s" % (st == master))
1392        else:
1393            active = ("%s" %(st > dt))
1394
1395        ifaces = [ ]
1396        for sub, attrs in iface_desc:
1397            inf = topdl.Interface(
1398                    substrate=sub,
1399                    attribute=[
1400                        topdl.Attribute(
1401                            attribute=n,
1402                            value = v)
1403                        for n, v in attrs
1404                        ]
1405                    )
1406            ifaces.append(inf)
1407        return topdl.Computer(
1408                name=myname,
1409                attribute=[ 
1410                    topdl.Attribute(attribute=n,value=v)
1411                        for n, v in (\
1412                            ('portal', 'true'),
1413                            ('domain', sdomain),
1414                            ('masterdomain', mdomain),
1415                            ('masterexperiment', "%s/%s" % \
1416                                    (mproject, eid)),
1417                            ('masteruser', muser),
1418                            ('smbshare', smbshare),
1419                            ('experiment', "%s/%s" % \
1420                                    (sproject, eid)),
1421                            ('peer', "%s" % desthost),
1422                            ('peer_segment', "%s" % aid),
1423                            ('scriptdir', 
1424                                "/usr/local/federation/bin"),
1425                            ('active', "%s" % active),
1426                            ('portal_type', portal_type), 
1427                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl >& /tmp/bridge.log'))
1428                    ],
1429                interface=ifaces,
1430                )
1431
1432    def new_portal_substrate(self, st, dt, eid, tbparams):
1433        ddomain = tbparams[dt].get('domain', ".example.com")
1434        dproject = tbparams[dt].get('project', 'project')
1435        tsubstrate = \
1436                topdl.Substrate(name='%s-%s' % (st, dt),
1437                        attribute= [
1438                            topdl.Attribute(
1439                                attribute='portal',
1440                                value='true')
1441                            ]
1442                        )
1443        segment_element = topdl.Segment(
1444                id= tbparams[dt]['allocID'],
1445                type='emulab',
1446                uri = self.tbmap.get(dt, None),
1447                interface=[ 
1448                    topdl.Interface(
1449                        substrate=tsubstrate.name),
1450                    ],
1451                attribute = [
1452                    topdl.Attribute(attribute=n, value=v)
1453                        for n, v in (\
1454                            ('domain', ddomain),
1455                            ('experiment', "%s/%s" % \
1456                                    (dproject, eid)),)
1457                    ],
1458                )
1459
1460        return (tsubstrate, segment_element)
1461
1462    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
1463        if sub.capacity is None:
1464            raise service_error(service_error.internal,
1465                    "Cannot DRAGON split substrate w/o capacity")
1466        segs = [ ]
1467        substr = topdl.Substrate(name="dragon%d" % idx, 
1468                capacity=sub.capacity.clone(),
1469                attribute=[ topdl.Attribute(attribute=n, value=v)
1470                    for n, v, in (\
1471                            ('vlan', 'unassigned%d' % idx),)])
1472        for tb in tbs.keys():
1473            seg = topdl.Segment(
1474                    id = tbparams[tb]['allocID'],
1475                    type='emulab',
1476                    uri = self.tbmap.get(tb, None),
1477                    interface=[ 
1478                        topdl.Interface(
1479                            substrate=substr.name),
1480                        ],
1481                    attribute=[ topdl.Attribute(
1482                        attribute='dragon_endpoint', 
1483                        value=tbparams[tb]['dragon']),
1484                        ]
1485                    )
1486            if tbparams[tb].has_key('vlans'):
1487                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1488            segs.append(seg)
1489
1490        topo["dragon%d" %idx] = \
1491                topdl.Topology(substrates=[substr], elements=segs,
1492                        attribute=[
1493                            topdl.Attribute(attribute="transit", value='true'),
1494                            topdl.Attribute(attribute="dynamic", value='true'),
1495                            topdl.Attribute(attribute="testbed", value='dragon'),
1496                            ]
1497                        )
1498
1499    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid):
1500        """
1501        Add attribiutes to the various elements indicating that they are to be
1502        dragon connected and create a dragon segment in tops to be
1503        instantiated.
1504        """
1505
1506        def get_substrate_from_topo(name, t):
1507            for s in t.substrates:
1508                if s.name == name: return s
1509            else: return None
1510
1511        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1512        elements = [ i.element for i in sub.interfaces ]
1513        count = { }
1514        for e in elements:
1515            tb = e.get_attribute('testbed')
1516            count[tb] = count.get(tb, 0) + 1
1517
1518        for tb in tbs.keys():
1519            s = get_substrate_from_topo(sub.name, topo[tb])
1520            if s:
1521                for i in s.interfaces:
1522                    i.set_attribute('dragon_vlan', 'unassigned%d' % dn)
1523                    if count[tb] > 1: i.set_attribute('dragon_type', 'lan')
1524                    else: i.set_attribute('dragon_type', 'link')
1525            else:
1526                raise service_error(service_error.internal,
1527                        "No substrate %s in testbed %s" % (sub.name, tb))
1528
1529        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1530
1531    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1532            segment_substrate, portals):
1533        # More than one testbed is on this substrate.  Insert
1534        # some portals into the subtopologies.  st == source testbed,
1535        # dt == destination testbed.
1536        for st in tbs.keys():
1537            if not segment_substrate.has_key(st):
1538                segment_substrate[st] = { }
1539            if not portals.has_key(st): 
1540                portals[st] = { }
1541            for dt in [ t for t in tbs.keys() if t != st]:
1542                sproject = tbparams[st].get('project', 'project')
1543                dproject = tbparams[dt].get('project', 'project')
1544                mproject = tbparams[master].get('project', 'project')
1545                sdomain = tbparams[st].get('domain', ".example.com")
1546                ddomain = tbparams[dt].get('domain', ".example.com")
1547                mdomain = tbparams[master].get('domain', '.example.com')
1548                muser = tbparams[master].get('user', 'root')
1549                smbshare = tbparams[master].get('smbshare', 'USERS')
1550                aid = tbparams[dt]['allocID']['fedid']
1551                if st == master or dt == master:
1552                    active = ("%s" % (st == master))
1553                else:
1554                    active = ("%s" %(st > dt))
1555                if not segment_substrate[st].has_key(dt):
1556                    # Put a substrate and a segment for the connected
1557                    # testbed in there.
1558                    tsubstrate, segment_element = \
1559                            self.new_portal_substrate(st, dt, eid, tbparams)
1560                    segment_substrate[st][dt] = tsubstrate
1561                    topo[st].substrates.append(tsubstrate)
1562                    topo[st].elements.append(segment_element)
1563
1564                new_portal = False
1565                if portals[st].has_key(dt):
1566                    # There's a portal set up to go to this destination.
1567                    # See if there's room to multiples this connection on
1568                    # it.  If so, add an interface to the portal; if not,
1569                    # set up to add a portal below.
1570                    # [This little festival of braces is just a pop of the
1571                    # last element in the list of portals between st and
1572                    # dt.]
1573                    portal = portals[st][dt][-1]
1574                    mux = len([ i for i in portal.interface \
1575                            if not i.get_attribute('portal')])
1576                    if mux == self.muxmax:
1577                        new_portal = True
1578                        portal_type = "experiment"
1579                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1580                        desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1581                    else:
1582                        new_i = topdl.Interface(
1583                                substrate=sub.name,
1584                                attribute=[ 
1585                                    topdl.Attribute(
1586                                        attribute='ip4_address', 
1587                                        value=tbs[dt]
1588                                    )
1589                                ])
1590                        portal.interface.append(new_i)
1591                else:
1592                    # First connection to this testbed, make an empty list
1593                    # and set up to add the new portal below
1594                    new_portal = True
1595                    portals[st][dt] = [ ]
1596                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1597                    desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1598
1599                    if dt == master or st == master: portal_type = "both"
1600                    else: portal_type = "experiment"
1601
1602                if new_portal:
1603                    infs = (
1604                            (segment_substrate[st][dt].name, 
1605                                (('portal', 'true'),)),
1606                            (sub.name, 
1607                                (('ip4_address', tbs[dt]),))
1608                        )
1609                    portal =  self.new_portal_node(st, dt, tbparams, 
1610                            master, eid, myname, desthost, portal_type,
1611                            infs)
1612                    if self.fedkit:
1613                        self.add_kit(portal, self.fedkit)
1614                    if self.gatewaykit: 
1615                        self.add_kit(portal, self.gatewaykit)
1616
1617                    topo[st].elements.append(portal)
1618                    portals[st][dt].append(portal)
1619
1620    def add_control_portal(self, st, dt, master, eid, topo, tbparams):
1621        # Add to the master testbed
1622        tsubstrate, segment_element = \
1623                self.new_portal_substrate(st, dt, eid, tbparams)
1624        myname = "%stunnel" % dt
1625        desthost = "%stunnel" % st
1626
1627        portal = self.new_portal_node(st, dt, tbparams, master,
1628                eid, myname, desthost, "control", 
1629                ((tsubstrate.name,(('portal','true'),)),))
1630        if self.fedkit:
1631            self.add_kit(portal, self.fedkit)
1632        if self.gatewaykit: 
1633            self.add_kit(portal, self.gatewaykit)
1634
1635        topo[st].substrates.append(tsubstrate)
1636        topo[st].elements.append(segment_element)
1637        topo[st].elements.append(portal)
1638
1639    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1640            substrate, tbparams):
1641        # Add to the master testbed
1642        myname = "%stunnel" % dt
1643        desthost = "%s" % ip_addr(dip)
1644
1645        portal = self.new_portal_node(st, dt, tbparams, master,
1646                eid, myname, desthost, "control", 
1647                ((substrate.name,(
1648                    ('portal','true'),
1649                    ('ip4_address', "%s" % ip_addr(myip)),
1650                    ('dragon_vlan', 'unassigned%d' % idx),
1651                    ('dragon_type', 'link'),)),))
1652        if self.fedkit:
1653            self.add_kit(portal, self.fedkit)
1654        if self.gatewaykit: 
1655            self.add_kit(portal, self.gatewaykit)
1656
1657        return portal
1658
1659    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator):
1660        """
1661        For each substrate in the main topology, find those that
1662        have nodes on more than one testbed.  Insert portal nodes
1663        into the copies of those substrates on the sub topologies.
1664        """
1665        segment_substrate = { }
1666        portals = { }
1667        for s in top.substrates:
1668            # tbs will contain an ip address on this subsrate that is in
1669            # each testbed.
1670            tbs = { }
1671            for i in s.interfaces:
1672                e = i.element
1673                tb = e.get_attribute('testbed')
1674                if tb and not tbs.has_key(tb):
1675                    for i in e.interface:
1676                        if s in i.subs:
1677                            tbs[tb]= i.get_attribute('ip4_address')
1678            if len(tbs) < 2:
1679                continue
1680
1681            # DRAGON will not create multi-site vlans yet
1682            if len(tbs) == 2 and \
1683                    all([tbparams[x].has_key('dragon') for x in tbs]):
1684                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1685                        master, eid)
1686            else:
1687                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1688                        eid, segment_substrate, portals)
1689
1690        # Make sure that all the slaves have a control portal back to the
1691        # master.
1692        for tb in [ t for t in tbparams.keys() if t != master ]:
1693            if len([e for e in topo[tb].elements \
1694                    if isinstance(e, topdl.Computer) and \
1695                    e.get_attribute('portal') and \
1696                    e.get_attribute('portal_type') == 'both']) == 0:
1697
1698                if tbparams[master].has_key('dragon') \
1699                        and tbparams[tb].has_key('dragon'):
1700
1701                    idx = len([x for x in topo.keys() \
1702                            if x.startswith('dragon')])
1703                    dip, leng = ip_allocator.allocate(4)
1704                    dip += 1
1705                    mip = dip+1
1706                    csub = topdl.Substrate(
1707                            name="dragon-control-%s" % tb,
1708                            capacity=topdl.Capacity(100000.0, 'max'),
1709                            attribute=[
1710                                topdl.Attribute(
1711                                    attribute='portal',
1712                                    value='true'
1713                                    )
1714                                ]
1715                            )
1716                    seg = topdl.Segment(
1717                            id= tbparams[master]['allocID'],
1718                            type='emulab',
1719                            uri = self.tbmap.get(master, None),
1720                            interface=[ 
1721                                topdl.Interface(
1722                                    substrate=csub.name),
1723                                ],
1724                            attribute = [
1725                                topdl.Attribute(attribute=n, value=v)
1726                                    for n, v in (\
1727                                        ('domain', 
1728                                            tbparams[master].get('domain',
1729                                                ".example.com")),
1730                                        ('experiment', "%s/%s" % \
1731                                                (tbparams[master].get(
1732                                                    'project', 
1733                                                    'project'), 
1734                                                    eid)),)
1735                                ],
1736                            )
1737                    topo[tb].substrates.append(csub)
1738                    topo[tb].elements.append(
1739                            self.new_dragon_portal(tb, master, master, eid, 
1740                                dip, mip, idx, csub, tbparams))
1741                    topo[tb].elements.append(seg)
1742
1743                    mcsub = csub.clone()
1744                    seg = topdl.Segment(
1745                            id= tbparams[tb]['allocID'],
1746                            type='emulab',
1747                            uri = self.tbmap.get(tb, None),
1748                            interface=[ 
1749                                topdl.Interface(
1750                                    substrate=csub.name),
1751                                ],
1752                            attribute = [
1753                                topdl.Attribute(attribute=n, value=v)
1754                                    for n, v in (\
1755                                        ('domain', 
1756                                            tbparams[tb].get('domain',
1757                                                ".example.com")),
1758                                        ('experiment', "%s/%s" % \
1759                                                (tbparams[tb].get('project', 
1760                                                    'project'), 
1761                                                    eid)),)
1762                                ],
1763                            )
1764                    topo[master].substrates.append(mcsub)
1765                    topo[master].elements.append(
1766                            self.new_dragon_portal(master, tb, master, eid, 
1767                                mip, dip, idx, mcsub, tbparams))
1768                    topo[master].elements.append(seg)
1769
1770                    self.create_dragon_substrate(csub, topo, 
1771                            {tb: 1, master:1}, tbparams, master, eid)
1772                else:
1773                    self.add_control_portal(master, tb, master, eid, topo, 
1774                            tbparams)
1775                    self.add_control_portal(tb, master, master, eid, topo, 
1776                            tbparams)
1777
1778        # Connect the portal nodes into the topologies and clear out
1779        # substrates that are not in the topologies
1780        for tb in tbparams.keys():
1781            topo[tb].incorporate_elements()
1782            topo[tb].substrates = \
1783                    [s for s in topo[tb].substrates \
1784                        if len(s.interfaces) >0]
1785
1786    def wrangle_software(self, expid, top, topo, tbparams):
1787        """
1788        Copy software out to the repository directory, allocate permissions and
1789        rewrite the segment topologies to look for the software in local
1790        places.
1791        """
1792
1793        # Copy the rpms and tarfiles to a distribution directory from
1794        # which the federants can retrieve them
1795        linkpath = "%s/software" %  expid
1796        softdir ="%s/%s" % ( self.repodir, linkpath)
1797        softmap = { }
1798        # These are in a list of tuples format (each kit).  This comprehension
1799        # unwraps them into a single list of tuples that initilaizes the set of
1800        # tuples.
1801        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1802                for p, t in l ])
1803        pkgs.update([x.location for e in top.elements \
1804                for x in e.software])
1805        try:
1806            os.makedirs(softdir)
1807        except IOError, e:
1808            raise service_error(
1809                    "Cannot create software directory: %s" % e)
1810        # The actual copying.  Everything's converted into a url for copying.
1811        for pkg in pkgs:
1812            loc = pkg
1813
1814            scheme, host, path = urlparse(loc)[0:3]
1815            dest = os.path.basename(path)
1816            if not scheme:
1817                if not loc.startswith('/'):
1818                    loc = "/%s" % loc
1819                loc = "file://%s" %loc
1820            try:
1821                u = urlopen(loc)
1822            except Exception, e:
1823                raise service_error(service_error.req, 
1824                        "Cannot open %s: %s" % (loc, e))
1825            try:
1826                f = open("%s/%s" % (softdir, dest) , "w")
1827                self.log.debug("Writing %s/%s" % (softdir,dest) )
1828                data = u.read(4096)
1829                while data:
1830                    f.write(data)
1831                    data = u.read(4096)
1832                f.close()
1833                u.close()
1834            except Exception, e:
1835                raise service_error(service_error.internal,
1836                        "Could not copy %s: %s" % (loc, e))
1837            path = re.sub("/tmp", "", linkpath)
1838            # XXX
1839            softmap[pkg] = \
1840                    "%s/%s/%s" %\
1841                    ( self.repo_url, path, dest)
1842
1843            # Allow the individual segments to access the software.
1844            for tb in tbparams.keys():
1845                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1846                        "/%s/%s" % ( path, dest))
1847
1848        # Convert the software locations in the segments into the local
1849        # copies on this host
1850        for soft in [ s for tb in topo.values() \
1851                for e in tb.elements \
1852                    if getattr(e, 'software', False) \
1853                        for s in e.software ]:
1854            if softmap.has_key(soft.location):
1855                soft.location = softmap[soft.location]
1856
1857
1858    def new_experiment(self, req, fid):
1859        """
1860        The external interface to empty initial experiment creation called from
1861        the dispatcher.
1862
1863        Creates a working directory, splits the incoming description using the
1864        splitter script and parses out the avrious subsections using the
1865        lcasses above.  Once each sub-experiment is created, use pooled threads
1866        to instantiate them and start it all up.
1867        """
1868        if not self.auth.check_attribute(fid, 'new'):
1869            raise service_error(service_error.access, "New access denied")
1870
1871        try:
1872            tmpdir = tempfile.mkdtemp(prefix="split-")
1873        except IOError:
1874            raise service_error(service_error.internal, "Cannot create tmp dir")
1875
1876        try:
1877            access_user = self.accessdb[fid]
1878        except KeyError:
1879            raise service_error(service_error.internal,
1880                    "Access map and authorizer out of sync in " + \
1881                            "new_experiment for fedid %s"  % fid)
1882
1883        pid = "dummy"
1884        gid = "dummy"
1885
1886        req = req.get('NewRequestBody', None)
1887        if not req:
1888            raise service_error(service_error.req,
1889                    "Bad request format (no NewRequestBody)")
1890
1891        # Generate an ID for the experiment (slice) and a certificate that the
1892        # allocator can use to prove they own it.  We'll ship it back through
1893        # the encrypted connection.
1894        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1895
1896        #now we're done with the tmpdir, and it should be empty
1897        if self.cleanup:
1898            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1899            os.rmdir(tmpdir)
1900        else:
1901            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1902
1903        eid = self.create_experiment_state(fid, req, expid, expcert, 
1904                state='empty')
1905
1906        # Let users touch the state
1907        self.auth.set_attribute(fid, expid)
1908        self.auth.set_attribute(expid, expid)
1909        # Override fedids can manipulate state as well
1910        for o in self.overrides:
1911            self.auth.set_attribute(o, expid)
1912
1913        rv = {
1914                'experimentID': [
1915                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1916                ],
1917                'experimentStatus': 'empty',
1918                'experimentAccess': { 'X509' : expcert }
1919            }
1920
1921        return rv
1922
1923
1924    def create_experiment(self, req, fid):
1925        """
1926        The external interface to experiment creation called from the
1927        dispatcher.
1928
1929        Creates a working directory, splits the incoming description using the
1930        splitter script and parses out the avrious subsections using the
1931        lcasses above.  Once each sub-experiment is created, use pooled threads
1932        to instantiate them and start it all up.
1933        """
1934
1935        req = req.get('CreateRequestBody', None)
1936        if not req:
1937            raise service_error(service_error.req,
1938                    "Bad request format (no CreateRequestBody)")
1939
1940        # Get the experiment access
1941        exp = req.get('experimentID', None)
1942        if exp:
1943            if exp.has_key('fedid'):
1944                key = exp['fedid']
1945                expid = key
1946                eid = None
1947            elif exp.has_key('localname'):
1948                key = exp['localname']
1949                eid = key
1950                expid = None
1951            else:
1952                raise service_error(service_error.req, "Unknown lookup type")
1953        else:
1954            raise service_error(service_error.req, "No request?")
1955
1956        self.check_experiment_access(fid, key)
1957
1958        try:
1959            tmpdir = tempfile.mkdtemp(prefix="split-")
1960            os.mkdir(tmpdir+"/keys")
1961        except IOError:
1962            raise service_error(service_error.internal, "Cannot create tmp dir")
1963
1964        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1965        gw_secretkey_base = "fed.%s" % self.ssh_type
1966        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1967        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1968        tclfile = tmpdir + "/experiment.tcl"
1969        tbparams = { }
1970        try:
1971            access_user = self.accessdb[fid]
1972        except KeyError:
1973            raise service_error(service_error.internal,
1974                    "Access map and authorizer out of sync in " + \
1975                            "create_experiment for fedid %s"  % fid)
1976
1977        pid = "dummy"
1978        gid = "dummy"
1979
1980        # The tcl parser needs to read a file so put the content into that file
1981        descr=req.get('experimentdescription', None)
1982        if descr:
1983            file_content=descr.get('ns2description', None)
1984            if file_content:
1985                try:
1986                    f = open(tclfile, 'w')
1987                    f.write(file_content)
1988                    f.close()
1989                except IOError:
1990                    raise service_error(service_error.internal,
1991                            "Cannot write temp experiment description")
1992            else:
1993                raise service_error(service_error.req, 
1994                        "Only ns2descriptions supported")
1995        else:
1996            raise service_error(service_error.req, "No experiment description")
1997
1998        self.state_lock.acquire()
1999        if self.state.has_key(key):
2000            for e in self.state[key].get('experimentID',[]):
2001                if not expid and e.has_key('fedid'):
2002                    expid = e['fedid']
2003                elif not eid and e.has_key('localname'):
2004                    eid = e['localname']
2005        self.state_lock.release()
2006
2007        if not (eid and expid):
2008            raise service_error(service_error.internal, 
2009                    "Cannot find local experiment info!?")
2010
2011        try: 
2012            # This catches exceptions to clear the placeholder if necessary
2013            try:
2014                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2015            except ValueError:
2016                raise service_error(service_error.server_config, 
2017                        "Bad key type (%s)" % self.ssh_type)
2018
2019            user = req.get('user', None)
2020            if user == None:
2021                raise service_error(service_error.req, "No user")
2022
2023            master = req.get('master', None)
2024            if not master:
2025                raise service_error(service_error.req,
2026                        "No master testbed label")
2027            export_project = req.get('exportProject', None)
2028            if not export_project:
2029                raise service_error(service_error.req, "No export project")
2030           
2031            # Translate to topdl
2032            if self.splitter_url:
2033                # XXX: need remote topdl translator
2034                self.log.debug("Calling remote splitter at %s" % \
2035                        self.splitter_url)
2036                split_data = self.remote_splitter(self.splitter_url,
2037                        file_content, master)
2038            else:
2039                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2040                    str(self.muxmax), '-m', master]
2041
2042                if self.fedkit:
2043                    tclcmd.append('-k')
2044
2045                if self.gatewaykit:
2046                    tclcmd.append('-K')
2047
2048                tclcmd.extend([pid, gid, eid, tclfile])
2049
2050                self.log.debug("running local splitter %s", " ".join(tclcmd))
2051                # This is just fantastic.  As a side effect the parser copies
2052                # tb_compat.tcl into the current directory, so that directory
2053                # must be writable by the fedd user.  Doing this in the
2054                # temporary subdir ensures this is the case.
2055                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2056                        cwd=tmpdir)
2057                split_data = tclparser.stdout
2058
2059            top = topdl.topology_from_xml(file=split_data, top="experiment")
2060
2061            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2062             # Find the testbeds to look up
2063            testbeds = set([ a.value for e in top.elements \
2064                    for a in e.attribute \
2065                    if a.attribute == 'testbed'] )
2066
2067            allocated = { }         # Testbeds we can access
2068            topo ={ }               # Sub topologies
2069            self.get_access_to_testbeds(testbeds, user, access_user, 
2070                    export_project, master, allocated, tbparams)
2071            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2072
2073            # Copy configuration files into the remote file store
2074            # The config urlpath
2075            configpath = "/%s/config" % expid
2076            # The config file system location
2077            configdir ="%s%s" % ( self.repodir, configpath)
2078            try:
2079                os.makedirs(configdir)
2080            except IOError, e:
2081                raise service_error(
2082                        "Cannot create config directory: %s" % e)
2083            try:
2084                f = open("%s/hosts" % configdir, "w")
2085                f.write('\n'.join(hosts))
2086                f.close()
2087            except IOError, e:
2088                raise service_error(service_error.internal, 
2089                        "Cannot write hosts file: %s" % e)
2090            try:
2091                copy_file("%s" % gw_pubkey, "%s/%s" % \
2092                        (configdir, gw_pubkey_base))
2093                copy_file("%s" % gw_secretkey, "%s/%s" % \
2094                        (configdir, gw_secretkey_base))
2095            except IOError, e:
2096                raise service_error(service_error.internal, 
2097                        "Cannot copy keyfiles: %s" % e)
2098
2099            # Allow the individual testbeds to access the configuration files.
2100            for tb in tbparams.keys():
2101                asignee = tbparams[tb]['allocID']['fedid']
2102                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2103                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2104
2105            self.add_portals(top, topo, eid, master, tbparams, ip_allocator)
2106            # Now get access to the dynamic testbeds
2107            for k, t in topo.items():
2108                if not t.get_attribute('dynamic'):
2109                    continue
2110                tb = t.get_attribute('testbed')
2111                if tb: 
2112                    self.get_access(tb, None, user, tbparams, master, 
2113                            export_project, access_user)
2114                    tbparams[k] = tbparams[tb]
2115                    del tbparams[tb]
2116                    allocated[k] = 1
2117                else:
2118                    raise service_error(service_error.internal, 
2119                            "Dynamic allocation from no testbed!?")
2120
2121            self.wrangle_software(expid, top, topo, tbparams)
2122
2123            vtopo = topdl.topology_to_vtopo(top)
2124            vis = self.genviz(vtopo)
2125
2126            # save federant information
2127            for k in allocated.keys():
2128                tbparams[k]['federant'] = {
2129                        'name': [ { 'localname' : eid} ],
2130                        'allocID' : tbparams[k]['allocID'],
2131                        'master' : k == master,
2132                        'uri': tbparams[k]['uri'],
2133                    }
2134                if tbparams[k].has_key('emulab'):
2135                        tbparams[k]['federant']['emulab'] = \
2136                                tbparams[k]['emulab']
2137
2138            self.state_lock.acquire()
2139            self.state[eid]['vtopo'] = vtopo
2140            self.state[eid]['vis'] = vis
2141            self.state[expid]['federant'] = \
2142                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2143                        if tbparams[tb].has_key('federant') ]
2144            if self.state_filename: 
2145                self.write_state()
2146            self.state_lock.release()
2147        except service_error, e:
2148            # If something goes wrong in the parse (usually an access error)
2149            # clear the placeholder state.  From here on out the code delays
2150            # exceptions.  Failing at this point returns a fault to the remote
2151            # caller.
2152
2153            self.state_lock.acquire()
2154            del self.state[eid]
2155            del self.state[expid]
2156            if self.state_filename: self.write_state()
2157            self.state_lock.release()
2158            raise e
2159
2160
2161        # Start the background swapper and return the starting state.  From
2162        # here on out, the state will stick around a while.
2163
2164        # Let users touch the state
2165        self.auth.set_attribute(fid, expid)
2166        self.auth.set_attribute(expid, expid)
2167        # Override fedids can manipulate state as well
2168        for o in self.overrides:
2169            self.auth.set_attribute(o, expid)
2170
2171        # Create a logger that logs to the experiment's state object as well as
2172        # to the main log file.
2173        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2174        alloc_collector = self.list_log(self.state[eid]['log'])
2175        h = logging.StreamHandler(alloc_collector)
2176        # XXX: there should be a global one of these rather than repeating the
2177        # code.
2178        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2179                    '%d %b %y %H:%M:%S'))
2180        alloc_log.addHandler(h)
2181       
2182        attrs = [ 
2183                {
2184                    'attribute': 'ssh_pubkey', 
2185                    'value': '%s/%s/config/%s' % \
2186                            (self.repo_url, expid, gw_pubkey_base)
2187                },
2188                {
2189                    'attribute': 'ssh_secretkey', 
2190                    'value': '%s/%s/config/%s' % \
2191                            (self.repo_url, expid, gw_secretkey_base)
2192                },
2193                {
2194                    'attribute': 'hosts', 
2195                    'value': '%s/%s/config/hosts' % \
2196                            (self.repo_url, expid)
2197                },
2198                {
2199                    'attribute': 'experiment_name',
2200                    'value': eid,
2201                },
2202            ]
2203
2204        # Start a thread to do the resource allocation
2205        t  = Thread(target=self.allocate_resources,
2206                args=(allocated, master, eid, expid, tbparams, 
2207                    topo, tmpdir, alloc_log, alloc_collector, attrs),
2208                name=eid)
2209        t.start()
2210
2211        rv = {
2212                'experimentID': [
2213                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2214                ],
2215                'experimentStatus': 'starting',
2216            }
2217
2218        return rv
2219   
2220    def get_experiment_fedid(self, key):
2221        """
2222        find the fedid associated with the localname key in the state database.
2223        """
2224
2225        rv = None
2226        self.state_lock.acquire()
2227        if self.state.has_key(key):
2228            if isinstance(self.state[key], dict):
2229                try:
2230                    kl = [ f['fedid'] for f in \
2231                            self.state[key]['experimentID']\
2232                                if f.has_key('fedid') ]
2233                except KeyError:
2234                    self.state_lock.release()
2235                    raise service_error(service_error.internal, 
2236                            "No fedid for experiment %s when getting "+\
2237                                    "fedid(!?)" % key)
2238                if len(kl) == 1:
2239                    rv = kl[0]
2240                else:
2241                    self.state_lock.release()
2242                    raise service_error(service_error.internal, 
2243                            "multiple fedids for experiment %s when " +\
2244                                    "getting fedid(!?)" % key)
2245            else:
2246                self.state_lock.release()
2247                raise service_error(service_error.internal, 
2248                        "Unexpected state for %s" % key)
2249        self.state_lock.release()
2250        return rv
2251
2252    def check_experiment_access(self, fid, key):
2253        """
2254        Confirm that the fid has access to the experiment.  Though a request
2255        may be made in terms of a local name, the access attribute is always
2256        the experiment's fedid.
2257        """
2258        if not isinstance(key, fedid):
2259            key = self.get_experiment_fedid(key)
2260
2261        if self.auth.check_attribute(fid, key):
2262            return True
2263        else:
2264            raise service_error(service_error.access, "Access Denied")
2265
2266
2267    def get_handler(self, path, fid):
2268        self.log.info("Get handler %s %s" % (path, fid))
2269        if self.auth.check_attribute(fid, path):
2270            return ("%s/%s" % (self.repodir, path), "application/binary")
2271        else:
2272            return (None, None)
2273
2274    def get_vtopo(self, req, fid):
2275        """
2276        Return the stored virtual topology for this experiment
2277        """
2278        rv = None
2279        state = None
2280
2281        req = req.get('VtopoRequestBody', None)
2282        if not req:
2283            raise service_error(service_error.req,
2284                    "Bad request format (no VtopoRequestBody)")
2285        exp = req.get('experiment', None)
2286        if exp:
2287            if exp.has_key('fedid'):
2288                key = exp['fedid']
2289                keytype = "fedid"
2290            elif exp.has_key('localname'):
2291                key = exp['localname']
2292                keytype = "localname"
2293            else:
2294                raise service_error(service_error.req, "Unknown lookup type")
2295        else:
2296            raise service_error(service_error.req, "No request?")
2297
2298        self.check_experiment_access(fid, key)
2299
2300        self.state_lock.acquire()
2301        if self.state.has_key(key):
2302            if self.state[key].has_key('vtopo'):
2303                rv = { 'experiment' : {keytype: key },\
2304                        'vtopo': self.state[key]['vtopo'],\
2305                    }
2306            else:
2307                state = self.state[key]['experimentStatus']
2308        self.state_lock.release()
2309
2310        if rv: return rv
2311        else: 
2312            if state:
2313                raise service_error(service_error.partial, 
2314                        "Not ready: %s" % state)
2315            else:
2316                raise service_error(service_error.req, "No such experiment")
2317
2318    def get_vis(self, req, fid):
2319        """
2320        Return the stored visualization for this experiment
2321        """
2322        rv = None
2323        state = None
2324
2325        req = req.get('VisRequestBody', None)
2326        if not req:
2327            raise service_error(service_error.req,
2328                    "Bad request format (no VisRequestBody)")
2329        exp = req.get('experiment', None)
2330        if exp:
2331            if exp.has_key('fedid'):
2332                key = exp['fedid']
2333                keytype = "fedid"
2334            elif exp.has_key('localname'):
2335                key = exp['localname']
2336                keytype = "localname"
2337            else:
2338                raise service_error(service_error.req, "Unknown lookup type")
2339        else:
2340            raise service_error(service_error.req, "No request?")
2341
2342        self.check_experiment_access(fid, key)
2343
2344        self.state_lock.acquire()
2345        if self.state.has_key(key):
2346            if self.state[key].has_key('vis'):
2347                rv =  { 'experiment' : {keytype: key },\
2348                        'vis': self.state[key]['vis'],\
2349                        }
2350            else:
2351                state = self.state[key]['experimentStatus']
2352        self.state_lock.release()
2353
2354        if rv: return rv
2355        else:
2356            if state:
2357                raise service_error(service_error.partial, 
2358                        "Not ready: %s" % state)
2359            else:
2360                raise service_error(service_error.req, "No such experiment")
2361
2362    def clean_info_response(self, rv):
2363        """
2364        Remove the information in the experiment's state object that is not in
2365        the info response.
2366        """
2367        # Remove the owner info (should always be there, but...)
2368        if rv.has_key('owner'): del rv['owner']
2369
2370        # Convert the log into the allocationLog parameter and remove the
2371        # log entry (with defensive programming)
2372        if rv.has_key('log'):
2373            rv['allocationLog'] = "".join(rv['log'])
2374            del rv['log']
2375        else:
2376            rv['allocationLog'] = ""
2377
2378        if rv['experimentStatus'] != 'active':
2379            if rv.has_key('federant'): del rv['federant']
2380        else:
2381            # remove the allocationID and uri info from each federant
2382            for f in rv.get('federant', []):
2383                if f.has_key('allocID'): del f['allocID']
2384                if f.has_key('uri'): del f['uri']
2385        return rv
2386
2387    def get_info(self, req, fid):
2388        """
2389        Return all the stored info about this experiment
2390        """
2391        rv = None
2392
2393        req = req.get('InfoRequestBody', None)
2394        if not req:
2395            raise service_error(service_error.req,
2396                    "Bad request format (no InfoRequestBody)")
2397        exp = req.get('experiment', None)
2398        if exp:
2399            if exp.has_key('fedid'):
2400                key = exp['fedid']
2401                keytype = "fedid"
2402            elif exp.has_key('localname'):
2403                key = exp['localname']
2404                keytype = "localname"
2405            else:
2406                raise service_error(service_error.req, "Unknown lookup type")
2407        else:
2408            raise service_error(service_error.req, "No request?")
2409
2410        self.check_experiment_access(fid, key)
2411
2412        # The state may be massaged by the service function that called
2413        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2414        # state.
2415        self.state_lock.acquire()
2416        if self.state.has_key(key):
2417            rv = copy.deepcopy(self.state[key])
2418        self.state_lock.release()
2419
2420        if rv:
2421            return self.clean_info_response(rv)
2422        else:
2423            raise service_error(service_error.req, "No such experiment")
2424
2425    def get_multi_info(self, req, fid):
2426        """
2427        Return all the stored info that this fedid can access
2428        """
2429        rv = { 'info': [ ] }
2430
2431        self.state_lock.acquire()
2432        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2433            try:
2434                self.check_experiment_access(fid, key)
2435            except service_error, e:
2436                if e.code == service_error.access:
2437                    continue
2438                else:
2439                    self.state_lock.release()
2440                    raise e
2441
2442            if self.state.has_key(key):
2443                e = copy.deepcopy(self.state[key])
2444                e = self.clean_info_response(e)
2445                rv['info'].append(e)
2446        self.state_lock.release()
2447        return rv
2448
2449    def terminate_experiment(self, req, fid):
2450        """
2451        Swap this experiment out on the federants and delete the shared
2452        information
2453        """
2454        tbparams = { }
2455        req = req.get('TerminateRequestBody', None)
2456        if not req:
2457            raise service_error(service_error.req,
2458                    "Bad request format (no TerminateRequestBody)")
2459        force = req.get('force', False)
2460        exp = req.get('experiment', None)
2461        if exp:
2462            if exp.has_key('fedid'):
2463                key = exp['fedid']
2464                keytype = "fedid"
2465            elif exp.has_key('localname'):
2466                key = exp['localname']
2467                keytype = "localname"
2468            else:
2469                raise service_error(service_error.req, "Unknown lookup type")
2470        else:
2471            raise service_error(service_error.req, "No request?")
2472
2473        self.check_experiment_access(fid, key)
2474
2475        dealloc_list = [ ]
2476
2477
2478        # Create a logger that logs to the dealloc_list as well as to the main
2479        # log file.
2480        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2481        h = logging.StreamHandler(self.list_log(dealloc_list))
2482        # XXX: there should be a global one of these rather than repeating the
2483        # code.
2484        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2485                    '%d %b %y %H:%M:%S'))
2486        dealloc_log.addHandler(h)
2487
2488        self.state_lock.acquire()
2489        fed_exp = self.state.get(key, None)
2490
2491        if fed_exp:
2492            # This branch of the conditional holds the lock to generate a
2493            # consistent temporary tbparams variable to deallocate experiments.
2494            # It releases the lock to do the deallocations and reacquires it to
2495            # remove the experiment state when the termination is complete.
2496
2497            # First make sure that the experiment creation is complete.
2498            status = fed_exp.get('experimentStatus', None)
2499
2500            if status:
2501                if status in ('starting', 'terminating'):
2502                    if not force:
2503                        self.state_lock.release()
2504                        raise service_error(service_error.partial, 
2505                                'Experiment still being created or destroyed')
2506                    else:
2507                        self.log.warning('Experiment in %s state ' % status + \
2508                                'being terminated by force.')
2509            else:
2510                # No status??? trouble
2511                self.state_lock.release()
2512                raise service_error(service_error.internal,
2513                        "Experiment has no status!?")
2514
2515            ids = []
2516            #  experimentID is a list of dicts that are self-describing
2517            #  identifiers.  This finds all the fedids and localnames - the
2518            #  keys of self.state - and puts them into ids.
2519            for id in fed_exp.get('experimentID', []):
2520                if id.has_key('fedid'): ids.append(id['fedid'])
2521                if id.has_key('localname'): ids.append(id['localname'])
2522
2523            # Collect the allocation/segment ids into a dict keyed by the fedid
2524            # of the allocation (or a monotonically increasing integer) that
2525            # contains a tuple of uri, aid (which is a dict...)
2526            for i, fed in enumerate(fed_exp.get('federant', [])):
2527                try:
2528                    uri = fed['uri']
2529                    aid = fed['allocID']
2530                    k = fed['allocID'].get('fedid', i)
2531                except KeyError, e:
2532                    continue
2533                tbparams[k] = (uri, aid)
2534            fed_exp['experimentStatus'] = 'terminating'
2535            if self.state_filename: self.write_state()
2536            self.state_lock.release()
2537
2538            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2539            # then completes, so we can't wait if nothing starts.  So, no
2540            # tbparams, no start.
2541            if len(tbparams) > 0:
2542                thread_pool = self.thread_pool(self.nthreads)
2543                for k in tbparams.keys():
2544                    # Create and start a thread to stop the segment
2545                    thread_pool.wait_for_slot()
2546                    uri, aid = tbparams[k]
2547                    t  = self.pooled_thread(\
2548                            target=self.terminate_segment(log=dealloc_log,
2549                                testbed=uri,
2550                                cert_file=self.cert_file, 
2551                                cert_pwd=self.cert_pwd,
2552                                trusted_certs=self.trusted_certs,
2553                                caller=self.call_TerminateSegment),
2554                            args=(uri, aid), name=k,
2555                            pdata=thread_pool, trace_file=self.trace_file)
2556                    t.start()
2557                # Wait for completions
2558                thread_pool.wait_for_all_done()
2559
2560            # release the allocations (failed experiments have done this
2561            # already, and starting experiments may be in odd states, so we
2562            # ignore errors releasing those allocations
2563            try: 
2564                for k in tbparams.keys():
2565                    # This releases access by uri
2566                    uri, aid = tbparams[k]
2567                    self.release_access(None, aid, uri=uri)
2568            except service_error, e:
2569                if status != 'failed' and not force:
2570                    raise e
2571
2572            # Remove the terminated experiment
2573            self.state_lock.acquire()
2574            for id in ids:
2575                if self.state.has_key(id): del self.state[id]
2576
2577            if self.state_filename: self.write_state()
2578            self.state_lock.release()
2579
2580            return { 
2581                    'experiment': exp , 
2582                    'deallocationLog': "".join(dealloc_list),
2583                    }
2584        else:
2585            # Don't forget to release the lock
2586            self.state_lock.release()
2587            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.