source: fedd/federation/experiment_control.py @ c326346

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since c326346 was 829246e, checked in by Ted Faber <faber@…>, 15 years ago

Merge from 1.30: Multi info was all screwed up if the requester wasn't
authorized to see any one of the experiments. This gracefully handles the
exceptions that check_experiment_access throws. The 1.30 change included
making some Locks RLocks, but I don't think that's required.

  • Property mode set to 100644
File size: 86.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221
222        self.exp_stem = "fed-stem"
223        self.log = logging.getLogger("fedd.experiment_control")
224        set_log_level(config, "experiment_control", self.log)
225        self.muxmax = 2
226        self.nthreads = 2
227        self.randomize_experiments = False
228
229        self.splitter = None
230        self.ssh_keygen = "/usr/bin/ssh-keygen"
231        self.ssh_identity_file = None
232
233
234        self.debug = config.getboolean("experiment_control", "create_debug")
235        self.cleanup = not config.getboolean("experiment_control", 
236                "leave_tmpfiles")
237        self.state_filename = config.get("experiment_control", 
238                "experiment_state")
239        self.splitter_url = config.get("experiment_control", "splitter_uri")
240        self.fedkit = parse_tarfile_list(\
241                config.get("experiment_control", "fedkit"))
242        self.gatewaykit = parse_tarfile_list(\
243                config.get("experiment_control", "gatewaykit"))
244        accessdb_file = config.get("experiment_control", "accessdb")
245
246        self.ssh_pubkey_file = config.get("experiment_control", 
247                "ssh_pubkey_file")
248        self.ssh_privkey_file = config.get("experiment_control",
249                "ssh_privkey_file")
250        # NB for internal master/slave ops, not experiment setup
251        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
252
253        self.overrides = set([])
254        ovr = config.get('experiment_control', 'overrides')
255        if ovr:
256            for o in ovr.split(","):
257                o = o.strip()
258                if o.startswith('fedid:'): o = o[len('fedid:'):]
259                self.overrides.add(fedid(hexstr=o))
260
261        self.state = { }
262        self.state_lock = Lock()
263        self.tclsh = "/usr/local/bin/otclsh"
264        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
265                config.get("experiment_control", "tcl_splitter",
266                        "/usr/testbed/lib/ns2ir/parse.tcl")
267        mapdb_file = config.get("experiment_control", "mapdb")
268        self.trace_file = sys.stderr
269
270        self.def_expstart = \
271                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
272                "/tmp/federate";
273        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
274                "FEDDIR/hosts";
275        self.def_gwstart = \
276                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
277                "/tmp/bridge.log";
278        self.def_mgwstart = \
279                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
280                "/tmp/bridge.log";
281        self.def_gwimage = "FBSD61-TUNNEL2";
282        self.def_gwtype = "pc";
283        self.local_access = { }
284
285        if auth:
286            self.auth = auth
287        else:
288            self.log.error(\
289                    "[access]: No authorizer initialized, creating local one.")
290            auth = authorizer()
291
292
293        if self.ssh_pubkey_file:
294            try:
295                f = open(self.ssh_pubkey_file, 'r')
296                self.ssh_pubkey = f.read()
297                f.close()
298            except IOError:
299                raise service_error(service_error.internal,
300                        "Cannot read sshpubkey")
301        else:
302            raise service_error(service_error.internal, 
303                    "No SSH public key file?")
304
305        if not self.ssh_privkey_file:
306            raise service_error(service_error.internal, 
307                    "No SSH public key file?")
308
309
310        if mapdb_file:
311            self.read_mapdb(mapdb_file)
312        else:
313            self.log.warn("[experiment_control] No testbed map, using defaults")
314            self.tbmap = { 
315                    'deter':'https://users.isi.deterlab.net:23235',
316                    'emulab':'https://users.isi.deterlab.net:23236',
317                    'ucb':'https://users.isi.deterlab.net:23237',
318                    }
319
320        if accessdb_file:
321                self.read_accessdb(accessdb_file)
322        else:
323            raise service_error(service_error.internal,
324                    "No accessdb specified in config")
325
326        # Grab saved state.  OK to do this w/o locking because it's read only
327        # and only one thread should be in existence that can see self.state at
328        # this point.
329        if self.state_filename:
330            self.read_state()
331
332        # Dispatch tables
333        self.soap_services = {\
334                'Create': soap_handler('Create', self.create_experiment),
335                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
336                'Vis': soap_handler('Vis', self.get_vis),
337                'Info': soap_handler('Info', self.get_info),
338                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
339                'Terminate': soap_handler('Terminate', 
340                    self.terminate_experiment),
341        }
342
343        self.xmlrpc_services = {\
344                'Create': xmlrpc_handler('Create', self.create_experiment),
345                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
346                'Vis': xmlrpc_handler('Vis', self.get_vis),
347                'Info': xmlrpc_handler('Info', self.get_info),
348                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
349                'Terminate': xmlrpc_handler('Terminate',
350                    self.terminate_experiment),
351        }
352
353    # Call while holding self.state_lock
354    def write_state(self):
355        """
356        Write a new copy of experiment state after copying the existing state
357        to a backup.
358
359        State format is a simple pickling of the state dictionary.
360        """
361        if os.access(self.state_filename, os.W_OK):
362            copy_file(self.state_filename, \
363                    "%s.bak" % self.state_filename)
364        try:
365            f = open(self.state_filename, 'w')
366            pickle.dump(self.state, f)
367        except IOError, e:
368            self.log.error("Can't write file %s: %s" % \
369                    (self.state_filename, e))
370        except pickle.PicklingError, e:
371            self.log.error("Pickling problem: %s" % e)
372        except TypeError, e:
373            self.log.error("Pickling problem (TypeError): %s" % e)
374
375    # Call while holding self.state_lock
376    def read_state(self):
377        """
378        Read a new copy of experiment state.  Old state is overwritten.
379
380        State format is a simple pickling of the state dictionary.
381        """
382       
383        def get_experiment_id(state):
384            """
385            Pull the fedid experimentID out of the saved state.  This is kind
386            of a gross walk through the dict.
387            """
388
389            if state.has_key('experimentID'):
390                for e in state['experimentID']:
391                    if e.has_key('fedid'):
392                        return e['fedid']
393                else:
394                    return None
395            else:
396                return None
397
398        def get_alloc_ids(state):
399            """
400            Pull the fedids of the identifiers of each allocation from the
401            state.  Again, a dict dive that's best isolated.
402            """
403
404            return [ f['allocID']['fedid'] 
405                    for f in state.get('federant',[]) \
406                        if f.has_key('allocID') and \
407                            f['allocID'].has_key('fedid')]
408
409
410        try:
411            f = open(self.state_filename, "r")
412            self.state = pickle.load(f)
413            self.log.debug("[read_state]: Read state from %s" % \
414                    self.state_filename)
415        except IOError, e:
416            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
417                    % (self.state_filename, e))
418        except pickle.UnpicklingError, e:
419            self.log.warning(("[read_state]: No saved state: " + \
420                    "Unpickling failed: %s") % e)
421       
422        for s in self.state.values():
423            try:
424
425                eid = get_experiment_id(s)
426                if eid : 
427                    # Give the owner rights to the experiment
428                    self.auth.set_attribute(s['owner'], eid)
429                    # And holders of the eid as well
430                    self.auth.set_attribute(eid, eid)
431                    # allow overrides to control experiments as well
432                    for o in self.overrides:
433                        self.auth.set_attribute(o, eid)
434                    # Set permissions to allow reading of the software repo, if
435                    # any, as well.
436                    for a in get_alloc_ids(s):
437                        self.auth.set_attribute(a, 'repo/%s' % eid)
438                else:
439                    raise KeyError("No experiment id")
440            except KeyError, e:
441                self.log.warning("[read_state]: State ownership or identity " +\
442                        "misformatted in %s: %s" % (self.state_filename, e))
443
444
445    def read_accessdb(self, accessdb_file):
446        """
447        Read the mapping from fedids that can create experiments to their name
448        in the 3-level access namespace.  All will be asserted from this
449        testbed and can include the local username and porject that will be
450        asserted on their behalf by this fedd.  Each fedid is also added to the
451        authorization system with the "create" attribute.
452        """
453        self.accessdb = {}
454        # These are the regexps for parsing the db
455        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
456        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
457                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
458        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
459                "\s*->\s*(" + name_expr + ")\s*$")
460        lineno = 0
461
462        # Parse the mappings and store in self.authdb, a dict of
463        # fedid -> (proj, user)
464        try:
465            f = open(accessdb_file, "r")
466            for line in f:
467                lineno += 1
468                line = line.strip()
469                if len(line) == 0 or line.startswith('#'):
470                    continue
471                m = project_line.match(line)
472                if m:
473                    fid = fedid(hexstr=m.group(1))
474                    project, user = m.group(2,3)
475                    if not self.accessdb.has_key(fid):
476                        self.accessdb[fid] = []
477                    self.accessdb[fid].append((project, user))
478                    continue
479
480                m = user_line.match(line)
481                if m:
482                    fid = fedid(hexstr=m.group(1))
483                    project = None
484                    user = m.group(2)
485                    if not self.accessdb.has_key(fid):
486                        self.accessdb[fid] = []
487                    self.accessdb[fid].append((project, user))
488                    continue
489                self.log.warn("[experiment_control] Error parsing access " +\
490                        "db %s at line %d" %  (accessdb_file, lineno))
491        except IOError:
492            raise service_error(service_error.internal,
493                    "Error opening/reading %s as experiment " +\
494                            "control accessdb" %  accessdb_file)
495        f.close()
496
497        # Initialize the authorization attributes
498        for fid in self.accessdb.keys():
499            self.auth.set_attribute(fid, 'create')
500
501    def read_mapdb(self, file):
502        """
503        Read a simple colon separated list of mappings for the
504        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
505        """
506
507        self.tbmap = { }
508        lineno =0
509        try:
510            f = open(file, "r")
511            for line in f:
512                lineno += 1
513                line = line.strip()
514                if line.startswith('#') or len(line) == 0:
515                    continue
516                try:
517                    label, url = line.split(':', 1)
518                    self.tbmap[label] = url
519                except ValueError, e:
520                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
521                            "map db: %s %s" % (lineno, line, e))
522        except IOError, e:
523            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
524                    "open %s: %s" % (file, e))
525        f.close()
526       
527    def generate_ssh_keys(self, dest, type="rsa" ):
528        """
529        Generate a set of keys for the gateways to use to talk.
530
531        Keys are of type type and are stored in the required dest file.
532        """
533        valid_types = ("rsa", "dsa")
534        t = type.lower();
535        if t not in valid_types: raise ValueError
536        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
537
538        try:
539            trace = open("/dev/null", "w")
540        except IOError:
541            raise service_error(service_error.internal,
542                    "Cannot open /dev/null??");
543
544        # May raise CalledProcessError
545        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
546        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
547        if rv != 0:
548            raise service_error(service_error.internal, 
549                    "Cannot generate nonce ssh keys.  %s return code %d" \
550                            % (self.ssh_keygen, rv))
551
552    def gentopo(self, str):
553        """
554        Generate the topology dtat structure from the splitter's XML
555        representation of it.
556
557        The topology XML looks like:
558            <experiment>
559                <nodes>
560                    <node><vname></vname><ips>ip1:ip2</ips></node>
561                </nodes>
562                <lans>
563                    <lan>
564                        <vname></vname><vnode></vnode><ip></ip>
565                        <bandwidth></bandwidth><member>node:port</member>
566                    </lan>
567                </lans>
568        """
569        class topo_parse:
570            """
571            Parse the topology XML and create the dats structure.
572            """
573            def __init__(self):
574                # Typing of the subelements for data conversion
575                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
576                self.int_subelements = ( 'bandwidth',)
577                self.float_subelements = ( 'delay',)
578                # The final data structure
579                self.nodes = [ ]
580                self.lans =  [ ]
581                self.topo = { \
582                        'node': self.nodes,\
583                        'lan' : self.lans,\
584                    }
585                self.element = { }  # Current element being created
586                self.chars = ""     # Last text seen
587
588            def end_element(self, name):
589                # After each sub element the contents is added to the current
590                # element or to the appropriate list.
591                if name == 'node':
592                    self.nodes.append(self.element)
593                    self.element = { }
594                elif name == 'lan':
595                    self.lans.append(self.element)
596                    self.element = { }
597                elif name in self.str_subelements:
598                    self.element[name] = self.chars
599                    self.chars = ""
600                elif name in self.int_subelements:
601                    self.element[name] = int(self.chars)
602                    self.chars = ""
603                elif name in self.float_subelements:
604                    self.element[name] = float(self.chars)
605                    self.chars = ""
606
607            def found_chars(self, data):
608                self.chars += data.rstrip()
609
610
611        tp = topo_parse();
612        parser = xml.parsers.expat.ParserCreate()
613        parser.EndElementHandler = tp.end_element
614        parser.CharacterDataHandler = tp.found_chars
615
616        parser.Parse(str)
617
618        return tp.topo
619       
620
621    def genviz(self, topo):
622        """
623        Generate the visualization the virtual topology
624        """
625
626        neato = "/usr/local/bin/neato"
627        # These are used to parse neato output and to create the visualization
628        # file.
629        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
630        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
631                "%s</type></node>"
632
633        try:
634            # Node names
635            nodes = [ n['vname'] for n in topo['node'] ]
636            topo_lans = topo['lan']
637        except KeyError, e:
638            raise service_error(service_error.internal, "Bad topology: %s" %e)
639
640        lans = { }
641        links = { }
642
643        # Walk through the virtual topology, organizing the connections into
644        # 2-node connections (links) and more-than-2-node connections (lans).
645        # When a lan is created, it's added to the list of nodes (there's a
646        # node in the visualization for the lan).
647        for l in topo_lans:
648            if links.has_key(l['vname']):
649                if len(links[l['vname']]) < 2:
650                    links[l['vname']].append(l['vnode'])
651                else:
652                    nodes.append(l['vname'])
653                    lans[l['vname']] = links[l['vname']]
654                    del links[l['vname']]
655                    lans[l['vname']].append(l['vnode'])
656            elif lans.has_key(l['vname']):
657                lans[l['vname']].append(l['vnode'])
658            else:
659                links[l['vname']] = [ l['vnode'] ]
660
661
662        # Open up a temporary file for dot to turn into a visualization
663        try:
664            df, dotname = tempfile.mkstemp()
665            dotfile = os.fdopen(df, 'w')
666        except IOError:
667            raise service_error(service_error.internal,
668                    "Failed to open file in genviz")
669
670        try:
671            dnull = open('/dev/null', 'w')
672        except IOError:
673            service_error(service_error.internal,
674                    "Failed to open /dev/null in genviz")
675
676        # Generate a dot/neato input file from the links, nodes and lans
677        try:
678            print >>dotfile, "graph G {"
679            for n in nodes:
680                print >>dotfile, '\t"%s"' % n
681            for l in links.keys():
682                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
683            for l in lans.keys():
684                for n in lans[l]:
685                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
686            print >>dotfile, "}"
687            dotfile.close()
688        except TypeError:
689            raise service_error(service_error.internal,
690                    "Single endpoint link in vtopo")
691        except IOError:
692            raise service_error(service_error.internal, "Cannot write dot file")
693
694        # Use dot to create a visualization
695        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
696                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
697                close_fds=True)
698        dnull.close()
699
700        # Translate dot to vis format
701        vis_nodes = [ ]
702        vis = { 'node': vis_nodes }
703        for line in dot.stdout:
704            m = vis_re.match(line)
705            if m:
706                vn = m.group(1)
707                vis_node = {'name': vn, \
708                        'x': float(m.group(2)),\
709                        'y' : float(m.group(3)),\
710                    }
711                if vn in links.keys() or vn in lans.keys():
712                    vis_node['type'] = 'lan'
713                else:
714                    vis_node['type'] = 'node'
715                vis_nodes.append(vis_node)
716        rv = dot.wait()
717
718        os.remove(dotname)
719        if rv == 0 : return vis
720        else: return None
721
722    def get_access(self, tb, nodes, user, tbparam, master, export_project,
723            access_user):
724        """
725        Get access to testbed through fedd and set the parameters for that tb
726        """
727        uri = self.tbmap.get(tb, None)
728        if not uri:
729            raise service_error(serice_error.server_config, 
730                    "Unknown testbed: %s" % tb)
731
732        # currently this lumps all users into one service access group
733        service_keys = [ a for u in user \
734                for a in u.get('access', []) \
735                    if a.has_key('sshPubkey')]
736
737        if len(service_keys) == 0:
738            raise service_error(service_error.req, 
739                    "Must have at least one SSH pubkey for services")
740
741        # Tweak search order so that if there are entries in access_user that
742        # have a project matching the export project, we try them first
743        if export_project and export_project.has_key('localname'): 
744            pn = export_project['localname'] 
745               
746            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
747            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
748        else: 
749            access_sequence = access_user
750
751        for p, u in access_sequence: 
752            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
753                    "to %s") %  ((p or "None"), u, uri))
754
755            if p:
756                # Request with user and project specified
757                req = {\
758                        'destinationTestbed' : { 'uri' : uri },
759                        'project': { 
760                            'name': {'localname': p},
761                            'user': [ {'userID': { 'localname': u } } ],
762                            },
763                        'user':  user,
764                        'allocID' : { 'localname': 'test' },
765                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
766                        'serviceAccess' : service_keys
767                    }
768            else:
769                # Request with only user specified
770                req = {\
771                        'destinationTestbed' : { 'uri' : uri },
772                        'user':  [ {'userID': { 'localname': u } } ],
773                        'allocID' : { 'localname': 'test' },
774                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
775                        'serviceAccess' : service_keys
776                    }
777
778            if tb == master:
779                # NB, the export_project parameter is a dict that includes
780                # the type
781                req['exportProject'] = export_project
782
783            # node resources if any
784            if nodes != None and len(nodes) > 0:
785                rnodes = [ ]
786                for n in nodes:
787                    rn = { }
788                    image, hw, count = n.split(":")
789                    if image: rn['image'] = [ image ]
790                    if hw: rn['hardware'] = [ hw ]
791                    if count and int(count) >0 : rn['count'] = int(count)
792                    rnodes.append(rn)
793                req['resources']= { }
794                req['resources']['node'] = rnodes
795
796            try:
797                if self.local_access.has_key(uri):
798                    # Local access call
799                    req = { 'RequestAccessRequestBody' : req }
800                    r = self.local_access[uri].RequestAccess(req, 
801                            fedid(file=self.cert_file))
802                    r = { 'RequestAccessResponseBody' : r }
803                else:
804                    r = self.call_RequestAccess(uri, req, 
805                            self.cert_file, self.cert_pwd, self.trusted_certs)
806            except service_error, e:
807                if e.code == service_error.access:
808                    self.log.debug("[get_access] Access denied")
809                    r = None
810                    continue
811                else:
812                    raise e
813
814            if r.has_key('RequestAccessResponseBody'):
815                # Through to here we have a valid response, not a fault.
816                # Access denied is a fault, so something better or worse than
817                # access denied has happened.
818                r = r['RequestAccessResponseBody']
819                self.log.debug("[get_access] Access granted")
820                break
821            else:
822                raise service_error(service_error.protocol,
823                        "Bad proxy response")
824       
825        if not r:
826            raise service_error(service_error.access, 
827                    "Access denied by %s (%s)" % (tb, uri))
828
829        if r.has_key('emulab'):
830            e = r['emulab']
831            p = e['project']
832            tbparam[tb] = { 
833                    "boss": e['boss'],
834                    "host": e['ops'],
835                    "domain": e['domain'],
836                    "fs": e['fileServer'],
837                    "eventserver": e['eventServer'],
838                    "project": unpack_id(p['name']),
839                    "emulab" : e,
840                    "allocID" : r['allocID'],
841                    "uri": uri,
842                    }
843            # Make the testbed name be the label the user applied
844            p['testbed'] = {'localname': tb }
845
846            for u in p['user']:
847                role = u.get('role', None)
848                if role == 'experimentCreation':
849                    tbparam[tb]['user'] = unpack_id(u['userID'])
850                    break
851            else:
852                raise service_error(service_error.internal, 
853                        "No createExperimentUser from %s" %tb)
854            # Add attributes to parameter space.  We don't allow attributes to
855            # overlay any parameters already installed.
856            for a in e['fedAttr']:
857                try:
858                    if a['attribute'] and \
859                            isinstance(a['attribute'], basestring)\
860                            and not tbparam[tb].has_key(a['attribute'].lower()):
861                        tbparam[tb][a['attribute'].lower()] = a['value']
862                except KeyError:
863                    self.log.error("Bad attribute in response: %s" % a)
864        else:
865            tbparam[tb] = { 
866                "allocID" : r['allocID'],
867                "uri": uri,
868            }
869
870       
871    def release_access(self, tb, aid, uri=None):
872        """
873        Release access to testbed through fedd
874        """
875
876        if not uri:
877            uri = self.tbmap.get(tb, None)
878        if not uri:
879            raise service_error(service_error.server_config, 
880                    "Unknown testbed: %s" % tb)
881
882        if self.local_access.has_key(uri):
883            resp = self.local_access[uri].ReleaseAccess(\
884                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
885                    fedid(file=self.cert_file))
886            resp = { 'ReleaseAccessResponseBody': resp } 
887        else:
888            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
889                    self.cert_file, self.cert_pwd, self.trusted_certs)
890
891        # better error coding
892
893    def remote_splitter(self, uri, desc, master):
894
895        req = {
896                'description' : { 'ns2description': desc },
897                'master': master,
898                'include_fedkit': bool(self.fedkit),
899                'include_gatewaykit': bool(self.gatewaykit)
900            }
901
902        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
903                self.trusted_certs)
904
905        if r.has_key('Ns2SplitResponseBody'):
906            r = r['Ns2SplitResponseBody']
907            if r.has_key('output'):
908                return r['output'].splitlines()
909            else:
910                raise service_error(service_error.protocol, 
911                        "Bad splitter response (no output)")
912        else:
913            raise service_error(service_error.protocol, "Bad splitter response")
914
915    class start_segment:
916        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
917                cert_pwd=None, trusted_certs=None, caller=None,
918                log_collector=None):
919            self.log = log
920            self.debug = debug
921            self.cert_file = cert_file
922            self.cert_pwd = cert_pwd
923            self.trusted_certs = None
924            self.caller = caller
925            self.testbed = testbed
926            self.log_collector = log_collector
927            self.response = None
928
929        def __call__(self, uri, aid, topo, master, attrs=None):
930            req = {
931                    'allocID': { 'fedid' : aid }, 
932                    'segmentdescription': { 
933                        'topdldescription': topo.to_dict(),
934                    },
935                    'master': master,
936                }
937            if attrs:
938                req['fedAttr'] = attrs
939
940            try:
941                self.log.debug("Calling StartSegment at %s " % uri)
942                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
943                        self.trusted_certs)
944                if r.has_key('StartSegmentResponseBody'):
945                    lval = r['StartSegmentResponseBody'].get('allocationLog',
946                            None)
947                    if lval and self.log_collector:
948                        for line in  lval.splitlines(True):
949                            self.log_collector.write(line)
950                    self.response = r
951                else:
952                    raise service_error(service_error.internal, 
953                            "Bad response!?: %s" %r)
954                return True
955            except service_error, e:
956                self.log.error("Start segment failed on %s: %s" % \
957                        (self.testbed, e))
958                return False
959
960
961
962    class terminate_segment:
963        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
964                cert_pwd=None, trusted_certs=None, caller=None):
965            self.log = log
966            self.debug = debug
967            self.cert_file = cert_file
968            self.cert_pwd = cert_pwd
969            self.trusted_certs = None
970            self.caller = caller
971            self.testbed = testbed
972
973        def __call__(self, uri, aid ):
974            req = {
975                    'allocID': aid , 
976                }
977            try:
978                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
979                        self.trusted_certs)
980                return True
981            except service_error, e:
982                self.log.error("Terminate segment failed on %s: %s" % \
983                        (self.testbed, e))
984                return False
985   
986
987    def allocate_resources(self, allocated, master, eid, expid, expcert, 
988            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
989            attrs=None):
990        def get_vlan(r):
991            if r.has_key('StartSegmentResponseBody'):
992                srb = r['StartSegmentResponseBody']
993                if srb.has_key('fedAttr'):
994                    for k, v in [ (a['attribute'], a['value']) \
995                            for a in srb['fedAttr']]:
996                        if k == 'vlan': return v
997            return None
998
999        started = { }           # Testbeds where a sub-experiment started
1000                                # successfully
1001
1002        # XXX
1003        fail_soft = False
1004
1005        slaves = [ k for k in allocated.keys() \
1006                if k != master and not topo[k].get_attribute('transit')]
1007        transit = [ k for k in allocated.keys() \
1008                if topo[k].get_attribute('transit')]
1009
1010        log = alloc_log or self.log
1011
1012        thread_pool = self.thread_pool(self.nthreads)
1013        threads = [ ]
1014
1015        for tb in transit:
1016            uri = tbparams[tb]['uri']
1017            if tbparams[tb].has_key('allocID') and \
1018                    tbparams[tb]['allocID'].has_key('fedid'):
1019                aid = tbparams[tb]['allocID']['fedid']
1020            else:
1021                raise service_error(service_error.internal, 
1022                        "No alloc id for testbed %s !?" % tb)
1023
1024            m = re.search('(\d+)', tb)
1025            if m:
1026                to_repl = "unassigned%s" % m.group(1)
1027            else:
1028                raise service_error(service_error.internal, 
1029                        "Bad dynamic allocation name")
1030                break
1031
1032            ss = self.start_segment(log=log, debug=self.debug, 
1033                testbed=tb, cert_file=self.cert_file, 
1034                cert_pwd=self.cert_pwd, 
1035                trusted_certs=self.trusted_certs,
1036                caller=self.call_StartSegment,
1037                log_collector=log_collector)
1038            t = self.pooled_thread(
1039                    target=ss,
1040                    args =(uri, aid, topo[tb], False, attrs),
1041                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1042            threads.append(t)
1043            t.start()
1044            # Wait until the this transit node finishes (keep pinging the log,
1045            # though)
1046
1047            mins = 0
1048            while not thread_pool.wait_for_all_done(60.0):
1049                mins += 1
1050                alloc_log.info("Waiting for transit (it has been %d mins)" \
1051                        % mins)
1052
1053            if t.rv:
1054                vlan = get_vlan(ss.response)
1055                if vlan is not None:
1056                    for k, t in topo.items():
1057                        for e in t.elements:
1058                            for i in e.interface:
1059                                vl = i.get_attribute('dragon_vlan')
1060                                if vl is not None and vl == to_repl:
1061                                    i.set_attribute('dragon_vlan', vlan)
1062            else:
1063                break
1064            thread_pool.clear()
1065
1066
1067        failed = [ t.getName() for t in threads if not t.rv ]
1068
1069        if len(failed) == 0:
1070            for tb in slaves:
1071                # Create and start a thread to start the segment, and save it
1072                # to get the return value later
1073                thread_pool.wait_for_slot()
1074                uri = self.tbmap.get(tb, None)
1075                if not uri:
1076                    raise service_error(service_error.internal, 
1077                            "Unknown testbed %s !?" % tb)
1078
1079                if tbparams[tb].has_key('allocID') and \
1080                        tbparams[tb]['allocID'].has_key('fedid'):
1081                    aid = tbparams[tb]['allocID']['fedid']
1082                else:
1083                    raise service_error(service_error.internal, 
1084                            "No alloc id for testbed %s !?" % tb)
1085
1086                t  = self.pooled_thread(\
1087                        target=self.start_segment(log=log, debug=self.debug,
1088                            testbed=tb, cert_file=self.cert_file,
1089                            cert_pwd=self.cert_pwd,
1090                            trusted_certs=self.trusted_certs,
1091                            caller=self.call_StartSegment,
1092                            log_collector=log_collector), 
1093                        args=(uri, aid, topo[tb], False, attrs), name=tb,
1094                        pdata=thread_pool, trace_file=self.trace_file)
1095                threads.append(t)
1096                t.start()
1097
1098            # Wait until all finish (keep pinging the log, though)
1099            mins = 0
1100            while not thread_pool.wait_for_all_done(60.0):
1101                mins += 1
1102                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1103                        % mins)
1104
1105            thread_pool.clear()
1106
1107        # If none failed, start the master
1108        failed = [ t.getName() for t in threads if not t.rv ]
1109
1110        if len(failed) == 0:
1111            uri = self.tbmap.get(master, None)
1112            if not uri:
1113                raise service_error(service_error.internal, 
1114                        "Unknown testbed %s !?" % master)
1115
1116            if tbparams[master].has_key('allocID') and \
1117                    tbparams[master]['allocID'].has_key('fedid'):
1118                aid = tbparams[master]['allocID']['fedid']
1119            else:
1120                raise service_error(service_error.internal, 
1121                    "No alloc id for testbed %s !?" % master)
1122            t = self.pooled_thread(
1123                    target=self.start_segment(log=log, debug=self.debug, 
1124                        testbed=master, cert_file=self.cert_file, 
1125                        cert_pwd=self.cert_pwd, 
1126                        trusted_certs=self.trusted_certs,
1127                        caller=self.call_StartSegment,
1128                        log_collector=log_collector),
1129                    args =(uri, aid, topo[master], True, attrs),
1130                    name=master, pdata=thread_pool, trace_file=self.trace_file)
1131            threads.append(t)
1132            t.start()
1133            # Wait until the master finishes (keep pinging the log, though)
1134            mins = 0
1135            while not thread_pool.wait_for_all_done(60.0):
1136                mins += 1
1137                alloc_log.info("Waiting for master (it has been %d mins)" \
1138                        % mins)
1139            # update failed to include the master, if it failed
1140            failed = [ t.getName() for t in threads if not t.rv ]
1141
1142        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1143        # If one failed clean up, unless fail_soft is set
1144        if failed:
1145            if not fail_soft:
1146                thread_pool.clear()
1147                for tb in succeeded:
1148                    # Create and start a thread to stop the segment
1149                    thread_pool.wait_for_slot()
1150                    uri = tbparams[tb]['uri']
1151                    t  = self.pooled_thread(\
1152                            target=self.terminate_segment(log=log,
1153                                testbed=tb,
1154                                cert_file=self.cert_file, 
1155                                cert_pwd=self.cert_pwd,
1156                                trusted_certs=self.trusted_certs,
1157                                caller=self.call_TerminateSegment),
1158                            args=(uri, tbparams[tb]['federant']['allocID']),
1159                            name=tb,
1160                            pdata=thread_pool, trace_file=self.trace_file)
1161                    t.start()
1162                # Wait until all finish
1163                thread_pool.wait_for_all_done()
1164
1165                # release the allocations
1166                for tb in tbparams.keys():
1167                    self.release_access(tb, tbparams[tb]['allocID'],
1168                            tbparams[tb].get('uri', None))
1169                # Remove the placeholder
1170                self.state_lock.acquire()
1171                self.state[eid]['experimentStatus'] = 'failed'
1172                if self.state_filename: self.write_state()
1173                self.state_lock.release()
1174
1175                log.error("Swap in failed on %s" % ",".join(failed))
1176                return
1177        else:
1178            log.info("[start_segment]: Experiment %s active" % eid)
1179
1180
1181        # Walk up tmpdir, deleting as we go
1182        if self.cleanup:
1183            log.debug("[start_experiment]: removing %s" % tmpdir)
1184            for path, dirs, files in os.walk(tmpdir, topdown=False):
1185                for f in files:
1186                    os.remove(os.path.join(path, f))
1187                for d in dirs:
1188                    os.rmdir(os.path.join(path, d))
1189            os.rmdir(tmpdir)
1190        else:
1191            log.debug("[start_experiment]: not removing %s" % tmpdir)
1192
1193        # Insert the experiment into our state and update the disk copy
1194        self.state_lock.acquire()
1195        self.state[expid]['experimentStatus'] = 'active'
1196        self.state[eid] = self.state[expid]
1197        if self.state_filename: self.write_state()
1198        self.state_lock.release()
1199        return
1200
1201
1202    def add_kit(self, e, kit):
1203        """
1204        Add a Software object created from the list of (install, location)
1205        tuples passed as kit  to the software attribute of an object e.  We
1206        do this enough to break out the code, but it's kind of a hack to
1207        avoid changing the old tuple rep.
1208        """
1209
1210        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1211
1212        if isinstance(e.software, list): e.software.extend(s)
1213        else: e.software = s
1214
1215
1216    def create_experiment_state(self, fid, req, expid, expcert):
1217        """
1218        Create the initial entry in the experiment's state.  The expid and
1219        expcert are the experiment's fedid and certifacte that represents that
1220        ID, which are installed in the experiment state.  If the request
1221        includes a suggested local name that is used if possible.  If the local
1222        name is already taken by an experiment owned by this user that has
1223        failed, it is overwriutten.  Otherwise new letters are added until a
1224        valid localname is found.  The generated local name is returned.
1225        """
1226
1227        if req.has_key('experimentID') and \
1228                req['experimentID'].has_key('localname'):
1229            overwrite = False
1230            eid = req['experimentID']['localname']
1231            # If there's an old failed experiment here with the same local name
1232            # and accessible by this user, we'll overwrite it, otherwise we'll
1233            # fall through and do the collision avoidance.
1234            old_expid = self.get_experiment_fedid(eid)
1235            if old_expid and self.check_experiment_access(fid, old_expid):
1236                self.state_lock.acquire()
1237                status = self.state[eid].get('experimentStatus', None)
1238                if status and status == 'failed':
1239                    # remove the old access attribute
1240                    self.auth.unset_attribute(fid, old_expid)
1241                    overwrite = True
1242                    del self.state[eid]
1243                    del self.state[old_expid]
1244                self.state_lock.release()
1245            self.state_lock.acquire()
1246            while (self.state.has_key(eid) and not overwrite):
1247                eid += random.choice(string.ascii_letters)
1248            # Initial state
1249            self.state[eid] = {
1250                    'experimentID' : \
1251                            [ { 'localname' : eid }, {'fedid': expid } ],
1252                    'experimentStatus': 'starting',
1253                    'experimentAccess': { 'X509' : expcert },
1254                    'owner': fid,
1255                    'log' : [],
1256                }
1257            self.state[expid] = self.state[eid]
1258            if self.state_filename: self.write_state()
1259            self.state_lock.release()
1260        else:
1261            eid = self.exp_stem
1262            for i in range(0,5):
1263                eid += random.choice(string.ascii_letters)
1264            self.state_lock.acquire()
1265            while (self.state.has_key(eid)):
1266                eid = self.exp_stem
1267                for i in range(0,5):
1268                    eid += random.choice(string.ascii_letters)
1269            # Initial state
1270            self.state[eid] = {
1271                    'experimentID' : \
1272                            [ { 'localname' : eid }, {'fedid': expid } ],
1273                    'experimentStatus': 'starting',
1274                    'experimentAccess': { 'X509' : expcert },
1275                    'owner': fid,
1276                    'log' : [],
1277                }
1278            self.state[expid] = self.state[eid]
1279            if self.state_filename: self.write_state()
1280            self.state_lock.release()
1281
1282        return eid
1283
1284
1285    def allocate_ips_to_topo(self, top):
1286        """
1287        Add an ip4_address attribute to all the hosts in the topology, based on
1288        the shared substrates on which they sit.  An /etc/hosts file is also
1289        created and returned as a list of hostfiles entries.  We also return
1290        the allocator, because we may need to allocate IPs to portals
1291        (specifically DRAGON portals).
1292        """
1293        subs = sorted(top.substrates, 
1294                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1295                reverse=True)
1296        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1297        ifs = { }
1298        hosts = [ ]
1299
1300        for idx, s in enumerate(subs):
1301            a = ips.allocate(len(s.interfaces)+2)
1302            if a :
1303                base, num = a
1304                if num < len(s.interfaces) +2 : 
1305                    raise service_error(service_error.internal,
1306                            "Allocator returned wrong number of IPs??")
1307            else:
1308                raise service_error(service_error.req, 
1309                        "Cannot allocate IP addresses")
1310
1311            base += 1
1312            for i in s.interfaces:
1313                i.attribute.append(
1314                        topdl.Attribute('ip4_address', 
1315                            "%s" % ip_addr(base)))
1316                hname = i.element.name[0]
1317                if ifs.has_key(hname):
1318                    hosts.append("%s\t%s-%s %s-%d" % \
1319                            (ip_addr(base), hname, s.name, hname,
1320                                ifs[hname]))
1321                else:
1322                    ifs[hname] = 0
1323                    hosts.append("%s\t%s-%s %s-%d %s" % \
1324                            (ip_addr(base), hname, s.name, hname,
1325                                ifs[hname], hname))
1326
1327                ifs[hname] += 1
1328                base += 1
1329        return hosts, ips
1330
1331    def get_access_to_testbeds(self, testbeds, user, access_user, 
1332            export_project, master, allocated, tbparams):
1333        """
1334        Request access to the various testbeds required for this instantiation
1335        (passed in as testbeds).  User, access_user, expoert_project and master
1336        are used to construct the correct requests.  Per-testbed parameters are
1337        returned in tbparams.
1338        """
1339        for tb in testbeds:
1340            self.get_access(tb, None, user, tbparams, master,
1341                    export_project, access_user)
1342            allocated[tb] = 1
1343
1344    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1345        """
1346        Create the sub-topologies that are needed for experimetn instantiation.
1347        Along the way attach startup commands to the computers in the
1348        subtopologies.
1349        """
1350        for tb in testbeds:
1351            topo[tb] = top.clone()
1352            to_delete = [ ]
1353            for e in topo[tb].elements:
1354                etb = e.get_attribute('testbed')
1355                if etb and etb != tb:
1356                    for i in e.interface:
1357                        for s in i.subs:
1358                            try:
1359                                s.interfaces.remove(i)
1360                            except ValueError:
1361                                raise service_error(service_error.internal,
1362                                        "Can't remove interface??")
1363                    to_delete.append(e)
1364            for e in to_delete:
1365                topo[tb].elements.remove(e)
1366            topo[tb].make_indices()
1367
1368            for e in [ e for e in topo[tb].elements \
1369                    if isinstance(e,topdl.Computer)]:
1370                if tb == master:
1371                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1372                else:
1373                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1374                scmd = e.get_attribute('startup')
1375                if scmd:
1376                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1377
1378                e.set_attribute('startup', cmd)
1379                if self.fedkit: self.add_kit(e, self.fedkit)
1380
1381    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1382            portal_type, iface_desc=()):
1383        sproject = tbparams[st].get('project', 'project')
1384        dproject = tbparams[dt].get('project', 'project')
1385        mproject = tbparams[master].get('project', 'project')
1386        sdomain = tbparams[st].get('domain', ".example.com")
1387        ddomain = tbparams[dt].get('domain', ".example.com")
1388        mdomain = tbparams[master].get('domain', '.example.com')
1389        muser = tbparams[master].get('user', 'root')
1390        smbshare = tbparams[master].get('smbshare', 'USERS')
1391        aid = tbparams[dt]['allocID']['fedid']
1392        if st == master or dt == master:
1393            active = ("%s" % (st == master))
1394        else:
1395            active = ("%s" %(st > dt))
1396
1397        ifaces = [ ]
1398        for sub, attrs in iface_desc:
1399            inf = topdl.Interface(
1400                    substrate=sub,
1401                    attribute=[
1402                        topdl.Attribute(
1403                            attribute=n,
1404                            value = v)
1405                        for n, v in attrs
1406                        ]
1407                    )
1408            ifaces.append(inf)
1409        return topdl.Computer(
1410                name=myname,
1411                attribute=[ 
1412                    topdl.Attribute(attribute=n,value=v)
1413                        for n, v in (\
1414                            ('portal', 'true'),
1415                            ('domain', sdomain),
1416                            ('masterdomain', mdomain),
1417                            ('masterexperiment', "%s/%s" % \
1418                                    (mproject, eid)),
1419                            ('masteruser', muser),
1420                            ('smbshare', smbshare),
1421                            ('experiment', "%s/%s" % \
1422                                    (sproject, eid)),
1423                            ('peer', "%s" % desthost),
1424                            ('peer_segment', "%s" % aid),
1425                            ('scriptdir', 
1426                                "/usr/local/federation/bin"),
1427                            ('active', "%s" % active),
1428                            ('portal_type', portal_type), 
1429                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl >& /tmp/bridge.log'))
1430                    ],
1431                interface=ifaces,
1432                )
1433
1434    def new_portal_substrate(self, st, dt, eid, tbparams):
1435        ddomain = tbparams[dt].get('domain', ".example.com")
1436        dproject = tbparams[dt].get('project', 'project')
1437        tsubstrate = \
1438                topdl.Substrate(name='%s-%s' % (st, dt),
1439                        attribute= [
1440                            topdl.Attribute(
1441                                attribute='portal',
1442                                value='true')
1443                            ]
1444                        )
1445        segment_element = topdl.Segment(
1446                id= tbparams[dt]['allocID'],
1447                type='emulab',
1448                uri = self.tbmap.get(dt, None),
1449                interface=[ 
1450                    topdl.Interface(
1451                        substrate=tsubstrate.name),
1452                    ],
1453                attribute = [
1454                    topdl.Attribute(attribute=n, value=v)
1455                        for n, v in (\
1456                            ('domain', ddomain),
1457                            ('experiment', "%s/%s" % \
1458                                    (dproject, eid)),)
1459                    ],
1460                )
1461
1462        return (tsubstrate, segment_element)
1463
1464    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
1465        if sub.capacity is None:
1466            raise service_error(service_error.internal,
1467                    "Cannot DRAGON split substrate w/o capacity")
1468        segs = [ ]
1469        substr = topdl.Substrate(name="dragon%d" % idx, 
1470                capacity=sub.capacity.clone(),
1471                attribute=[ topdl.Attribute(attribute=n, value=v)
1472                    for n, v, in (\
1473                            ('vlan', 'unassigned%d' % idx),)])
1474        for tb in tbs.keys():
1475            seg = topdl.Segment(
1476                    id = tbparams[tb]['allocID'],
1477                    type='emulab',
1478                    uri = self.tbmap.get(tb, None),
1479                    interface=[ 
1480                        topdl.Interface(
1481                            substrate=substr.name),
1482                        ],
1483                    attribute=[ topdl.Attribute(
1484                        attribute='dragon_endpoint', 
1485                        value=tbparams[tb]['dragon']),
1486                        ]
1487                    )
1488            if tbparams[tb].has_key('vlans'):
1489                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1490            segs.append(seg)
1491
1492        topo["dragon%d" %idx] = \
1493                topdl.Topology(substrates=[substr], elements=segs,
1494                        attribute=[
1495                            topdl.Attribute(attribute="transit", value='true'),
1496                            topdl.Attribute(attribute="dynamic", value='true'),
1497                            topdl.Attribute(attribute="testbed", value='dragon'),
1498                            ]
1499                        )
1500
1501    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid):
1502        """
1503        Add attribiutes to the various elements indicating that they are to be
1504        dragon connected and create a dragon segment in tops to be
1505        instantiated.
1506        """
1507
1508        def get_substrate_from_topo(name, t):
1509            for s in t.substrates:
1510                if s.name == name: return s
1511            else: return None
1512
1513        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1514        elements = [ i.element for i in sub.interfaces ]
1515        count = { }
1516        for e in elements:
1517            tb = e.get_attribute('testbed')
1518            count[tb] = count.get(tb, 0) + 1
1519
1520        for tb in tbs.keys():
1521            s = get_substrate_from_topo(sub.name, topo[tb])
1522            if s:
1523                for i in s.interfaces:
1524                    i.set_attribute('dragon_vlan', 'unassigned%d' % dn)
1525                    if count[tb] > 1: i.set_attribute('dragon_type', 'lan')
1526                    else: i.set_attribute('dragon_type', 'link')
1527            else:
1528                raise service_error(service_error.internal,
1529                        "No substrate %s in testbed %s" % (sub.name, tb))
1530
1531        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1532
1533    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1534            segment_substrate, portals):
1535        # More than one testbed is on this substrate.  Insert
1536        # some portals into the subtopologies.  st == source testbed,
1537        # dt == destination testbed.
1538        for st in tbs.keys():
1539            if not segment_substrate.has_key(st):
1540                segment_substrate[st] = { }
1541            if not portals.has_key(st): 
1542                portals[st] = { }
1543            for dt in [ t for t in tbs.keys() if t != st]:
1544                sproject = tbparams[st].get('project', 'project')
1545                dproject = tbparams[dt].get('project', 'project')
1546                mproject = tbparams[master].get('project', 'project')
1547                sdomain = tbparams[st].get('domain', ".example.com")
1548                ddomain = tbparams[dt].get('domain', ".example.com")
1549                mdomain = tbparams[master].get('domain', '.example.com')
1550                muser = tbparams[master].get('user', 'root')
1551                smbshare = tbparams[master].get('smbshare', 'USERS')
1552                aid = tbparams[dt]['allocID']['fedid']
1553                if st == master or dt == master:
1554                    active = ("%s" % (st == master))
1555                else:
1556                    active = ("%s" %(st > dt))
1557                if not segment_substrate[st].has_key(dt):
1558                    # Put a substrate and a segment for the connected
1559                    # testbed in there.
1560                    tsubstrate, segment_element = \
1561                            self.new_portal_substrate(st, dt, eid, tbparams)
1562                    segment_substrate[st][dt] = tsubstrate
1563                    topo[st].substrates.append(tsubstrate)
1564                    topo[st].elements.append(segment_element)
1565
1566                new_portal = False
1567                if portals[st].has_key(dt):
1568                    # There's a portal set up to go to this destination.
1569                    # See if there's room to multiples this connection on
1570                    # it.  If so, add an interface to the portal; if not,
1571                    # set up to add a portal below.
1572                    # [This little festival of braces is just a pop of the
1573                    # last element in the list of portals between st and
1574                    # dt.]
1575                    portal = portals[st][dt][-1]
1576                    mux = len([ i for i in portal.interface \
1577                            if not i.get_attribute('portal')])
1578                    if mux == self.muxmax:
1579                        new_portal = True
1580                        portal_type = "experiment"
1581                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1582                        desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1583                    else:
1584                        new_i = topdl.Interface(
1585                                substrate=sub.name,
1586                                attribute=[ 
1587                                    topdl.Attribute(
1588                                        attribute='ip4_address', 
1589                                        value=tbs[dt]
1590                                    )
1591                                ])
1592                        portal.interface.append(new_i)
1593                else:
1594                    # First connection to this testbed, make an empty list
1595                    # and set up to add the new portal below
1596                    new_portal = True
1597                    portals[st][dt] = [ ]
1598                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1599                    desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1600
1601                    if dt == master or st == master: portal_type = "both"
1602                    else: portal_type = "experiment"
1603
1604                if new_portal:
1605                    infs = (
1606                            (segment_substrate[st][dt].name, 
1607                                (('portal', 'true'),)),
1608                            (sub.name, 
1609                                (('ip4_address', tbs[dt]),))
1610                        )
1611                    portal =  self.new_portal_node(st, dt, tbparams, 
1612                            master, eid, myname, desthost, portal_type,
1613                            infs)
1614                    if self.fedkit:
1615                        self.add_kit(portal, self.fedkit)
1616                    if self.gatewaykit: 
1617                        self.add_kit(portal, self.gatewaykit)
1618
1619                    topo[st].elements.append(portal)
1620                    portals[st][dt].append(portal)
1621
1622    def add_control_portal(self, st, dt, master, eid, topo, tbparams):
1623        # Add to the master testbed
1624        tsubstrate, segment_element = \
1625                self.new_portal_substrate(st, dt, eid, tbparams)
1626        myname = "%stunnel" % dt
1627        desthost = "%stunnel" % st
1628
1629        portal = self.new_portal_node(st, dt, tbparams, master,
1630                eid, myname, desthost, "control", 
1631                ((tsubstrate.name,(('portal','true'),)),))
1632        if self.fedkit:
1633            self.add_kit(portal, self.fedkit)
1634        if self.gatewaykit: 
1635            self.add_kit(portal, self.gatewaykit)
1636
1637        topo[st].substrates.append(tsubstrate)
1638        topo[st].elements.append(segment_element)
1639        topo[st].elements.append(portal)
1640
1641    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1642            substrate, tbparams):
1643        # Add to the master testbed
1644        myname = "%stunnel" % dt
1645        desthost = "%s" % ip_addr(dip)
1646
1647        portal = self.new_portal_node(st, dt, tbparams, master,
1648                eid, myname, desthost, "control", 
1649                ((substrate.name,(
1650                    ('portal','true'),
1651                    ('ip4_address', "%s" % ip_addr(myip)),
1652                    ('dragon_vlan', 'unassigned%d' % idx),
1653                    ('dragon_type', 'link'),)),))
1654        if self.fedkit:
1655            self.add_kit(portal, self.fedkit)
1656        if self.gatewaykit: 
1657            self.add_kit(portal, self.gatewaykit)
1658
1659        return portal
1660
1661    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator):
1662        """
1663        For each substrate in the main topology, find those that
1664        have nodes on more than one testbed.  Insert portal nodes
1665        into the copies of those substrates on the sub topologies.
1666        """
1667        segment_substrate = { }
1668        portals = { }
1669        for s in top.substrates:
1670            # tbs will contain an ip address on this subsrate that is in
1671            # each testbed.
1672            tbs = { }
1673            for i in s.interfaces:
1674                e = i.element
1675                tb = e.get_attribute('testbed')
1676                if tb and not tbs.has_key(tb):
1677                    for i in e.interface:
1678                        if s in i.subs:
1679                            tbs[tb]= i.get_attribute('ip4_address')
1680            if len(tbs) < 2:
1681                continue
1682
1683            # DRAGON will not create multi-site vlans yet
1684            if len(tbs) == 2 and \
1685                    all([tbparams[x].has_key('dragon') for x in tbs]):
1686                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1687                        master, eid)
1688            else:
1689                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1690                        eid, segment_substrate, portals)
1691
1692        # Make sure that all the slaves have a control portal back to the
1693        # master.
1694        for tb in [ t for t in tbparams.keys() if t != master ]:
1695            if len([e for e in topo[tb].elements \
1696                    if isinstance(e, topdl.Computer) and \
1697                    e.get_attribute('portal') and \
1698                    e.get_attribute('portal_type') == 'both']) == 0:
1699
1700                if tbparams[master].has_key('dragon') \
1701                        and tbparams[tb].has_key('dragon'):
1702
1703                    idx = len([x for x in topo.keys() \
1704                            if x.startswith('dragon')])
1705                    dip, leng = ip_allocator.allocate(4)
1706                    dip += 1
1707                    mip = dip+1
1708                    csub = topdl.Substrate(
1709                            name="dragon-control-%s" % tb,
1710                            capacity=topdl.Capacity(100000.0, 'max'),
1711                            attribute=[
1712                                topdl.Attribute(
1713                                    attribute='portal',
1714                                    value='true'
1715                                    )
1716                                ]
1717                            )
1718                    seg = topdl.Segment(
1719                            id= tbparams[master]['allocID'],
1720                            type='emulab',
1721                            uri = self.tbmap.get(master, None),
1722                            interface=[ 
1723                                topdl.Interface(
1724                                    substrate=csub.name),
1725                                ],
1726                            attribute = [
1727                                topdl.Attribute(attribute=n, value=v)
1728                                    for n, v in (\
1729                                        ('domain', 
1730                                            tbparams[master].get('domain',
1731                                                ".example.com")),
1732                                        ('experiment', "%s/%s" % \
1733                                                (tbparams[master].get(
1734                                                    'project', 
1735                                                    'project'), 
1736                                                    eid)),)
1737                                ],
1738                            )
1739                    topo[tb].substrates.append(csub)
1740                    topo[tb].elements.append(
1741                            self.new_dragon_portal(tb, master, master, eid, 
1742                                dip, mip, idx, csub, tbparams))
1743                    topo[tb].elements.append(seg)
1744
1745                    mcsub = csub.clone()
1746                    seg = topdl.Segment(
1747                            id= tbparams[tb]['allocID'],
1748                            type='emulab',
1749                            uri = self.tbmap.get(tb, None),
1750                            interface=[ 
1751                                topdl.Interface(
1752                                    substrate=csub.name),
1753                                ],
1754                            attribute = [
1755                                topdl.Attribute(attribute=n, value=v)
1756                                    for n, v in (\
1757                                        ('domain', 
1758                                            tbparams[tb].get('domain',
1759                                                ".example.com")),
1760                                        ('experiment', "%s/%s" % \
1761                                                (tbparams[tb].get('project', 
1762                                                    'project'), 
1763                                                    eid)),)
1764                                ],
1765                            )
1766                    topo[master].substrates.append(mcsub)
1767                    topo[master].elements.append(
1768                            self.new_dragon_portal(master, tb, master, eid, 
1769                                mip, dip, idx, mcsub, tbparams))
1770                    topo[master].elements.append(seg)
1771
1772                    self.create_dragon_substrate(csub, topo, 
1773                            {tb: 1, master:1}, tbparams, master, eid)
1774                else:
1775                    self.add_control_portal(master, tb, master, eid, topo, 
1776                            tbparams)
1777                    self.add_control_portal(tb, master, master, eid, topo, 
1778                            tbparams)
1779
1780        # Connect the portal nodes into the topologies and clear out
1781        # substrates that are not in the topologies
1782        for tb in tbparams.keys():
1783            topo[tb].incorporate_elements()
1784            topo[tb].substrates = \
1785                    [s for s in topo[tb].substrates \
1786                        if len(s.interfaces) >0]
1787
1788    def wrangle_software(self, expid, top, topo, tbparams):
1789        """
1790        Copy software out to the repository directory, allocate permissions and
1791        rewrite the segment topologies to look for the software in local
1792        places.
1793        """
1794
1795        # Copy the rpms and tarfiles to a distribution directory from
1796        # which the federants can retrieve them
1797        linkpath = "%s/software" %  expid
1798        softdir ="%s/%s" % ( self.repodir, linkpath)
1799        softmap = { }
1800        # These are in a list of tuples format (each kit).  This comprehension
1801        # unwraps them into a single list of tuples that initilaizes the set of
1802        # tuples.
1803        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1804                for p, t in l ])
1805        pkgs.update([x.location for e in top.elements \
1806                for x in e.software])
1807        try:
1808            os.makedirs(softdir)
1809        except IOError, e:
1810            raise service_error(
1811                    "Cannot create software directory: %s" % e)
1812        # The actual copying.  Everything's converted into a url for copying.
1813        for pkg in pkgs:
1814            loc = pkg
1815
1816            scheme, host, path = urlparse(loc)[0:3]
1817            dest = os.path.basename(path)
1818            if not scheme:
1819                if not loc.startswith('/'):
1820                    loc = "/%s" % loc
1821                loc = "file://%s" %loc
1822            try:
1823                u = urlopen(loc)
1824            except Exception, e:
1825                raise service_error(service_error.req, 
1826                        "Cannot open %s: %s" % (loc, e))
1827            try:
1828                f = open("%s/%s" % (softdir, dest) , "w")
1829                self.log.debug("Writing %s/%s" % (softdir,dest) )
1830                data = u.read(4096)
1831                while data:
1832                    f.write(data)
1833                    data = u.read(4096)
1834                f.close()
1835                u.close()
1836            except Exception, e:
1837                raise service_error(service_error.internal,
1838                        "Could not copy %s: %s" % (loc, e))
1839            path = re.sub("/tmp", "", linkpath)
1840            # XXX
1841            softmap[pkg] = \
1842                    "https://users.isi.deterlab.net:23235/%s/%s" %\
1843                    ( path, dest)
1844
1845            # Allow the individual segments to access the software.
1846            for tb in tbparams.keys():
1847                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1848                        "/%s/%s" % ( path, dest))
1849
1850        # Convert the software locations in the segments into the local
1851        # copies on this host
1852        for soft in [ s for tb in topo.values() \
1853                for e in tb.elements \
1854                    if getattr(e, 'software', False) \
1855                        for s in e.software ]:
1856            if softmap.has_key(soft.location):
1857                soft.location = softmap[soft.location]
1858
1859
1860    def create_experiment(self, req, fid):
1861        """
1862        The external interface to experiment creation called from the
1863        dispatcher.
1864
1865        Creates a working directory, splits the incoming description using the
1866        splitter script and parses out the avrious subsections using the
1867        lcasses above.  Once each sub-experiment is created, use pooled threads
1868        to instantiate them and start it all up.
1869        """
1870        if not self.auth.check_attribute(fid, 'create'):
1871            raise service_error(service_error.access, "Create access denied")
1872
1873        try:
1874            tmpdir = tempfile.mkdtemp(prefix="split-")
1875            os.mkdir(tmpdir+"/keys")
1876        except IOError:
1877            raise service_error(service_error.internal, "Cannot create tmp dir")
1878
1879        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1880        gw_secretkey_base = "fed.%s" % self.ssh_type
1881        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1882        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1883        tclfile = tmpdir + "/experiment.tcl"
1884        tbparams = { }
1885        try:
1886            access_user = self.accessdb[fid]
1887        except KeyError:
1888            raise service_error(service_error.internal,
1889                    "Access map and authorizer out of sync in " + \
1890                            "create_experiment for fedid %s"  % fid)
1891
1892        pid = "dummy"
1893        gid = "dummy"
1894
1895        req = req.get('CreateRequestBody', None)
1896        if not req:
1897            raise service_error(service_error.req,
1898                    "Bad request format (no CreateRequestBody)")
1899        # The tcl parser needs to read a file so put the content into that file
1900        descr=req.get('experimentdescription', None)
1901        if descr:
1902            file_content=descr.get('ns2description', None)
1903            if file_content:
1904                try:
1905                    f = open(tclfile, 'w')
1906                    f.write(file_content)
1907                    f.close()
1908                except IOError:
1909                    raise service_error(service_error.internal,
1910                            "Cannot write temp experiment description")
1911            else:
1912                raise service_error(service_error.req, 
1913                        "Only ns2descriptions supported")
1914        else:
1915            raise service_error(service_error.req, "No experiment description")
1916
1917        # Generate an ID for the experiment (slice) and a certificate that the
1918        # allocator can use to prove they own it.  We'll ship it back through
1919        # the encrypted connection.
1920        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1921
1922        eid = self.create_experiment_state(fid, req, expid, expcert)
1923        try: 
1924            # This catches exceptions to clear the placeholder if necessary
1925            try:
1926                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1927            except ValueError:
1928                raise service_error(service_error.server_config, 
1929                        "Bad key type (%s)" % self.ssh_type)
1930
1931            user = req.get('user', None)
1932            if user == None:
1933                raise service_error(service_error.req, "No user")
1934
1935            master = req.get('master', None)
1936            if not master:
1937                raise service_error(service_error.req,
1938                        "No master testbed label")
1939            export_project = req.get('exportProject', None)
1940            if not export_project:
1941                raise service_error(service_error.req, "No export project")
1942           
1943            # Translate to topdl
1944            if self.splitter_url:
1945                # XXX: need remote topdl translator
1946                self.log.debug("Calling remote splitter at %s" % \
1947                        self.splitter_url)
1948                split_data = self.remote_splitter(self.splitter_url,
1949                        file_content, master)
1950            else:
1951                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1952                    str(self.muxmax), '-m', master]
1953
1954                if self.fedkit:
1955                    tclcmd.append('-k')
1956
1957                if self.gatewaykit:
1958                    tclcmd.append('-K')
1959
1960                tclcmd.extend([pid, gid, eid, tclfile])
1961
1962                self.log.debug("running local splitter %s", " ".join(tclcmd))
1963                # This is just fantastic.  As a side effect the parser copies
1964                # tb_compat.tcl into the current directory, so that directory
1965                # must be writable by the fedd user.  Doing this in the
1966                # temporary subdir ensures this is the case.
1967                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1968                        cwd=tmpdir)
1969                split_data = tclparser.stdout
1970
1971            top = topdl.topology_from_xml(file=split_data, top="experiment")
1972
1973            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1974             # Find the testbeds to look up
1975            testbeds = set([ a.value for e in top.elements \
1976                    for a in e.attribute \
1977                    if a.attribute == 'testbed'] )
1978
1979            allocated = { }         # Testbeds we can access
1980            topo ={ }               # Sub topologies
1981            self.get_access_to_testbeds(testbeds, user, access_user, 
1982                    export_project, master, allocated, tbparams)
1983            self.split_topology(top, topo, testbeds, eid, master, tbparams)
1984
1985            # Copy configuration files into the remote file store
1986            # The config urlpath
1987            configpath = "/%s/config" % expid
1988            # The config file system location
1989            configdir ="%s%s" % ( self.repodir, configpath)
1990            try:
1991                os.makedirs(configdir)
1992            except IOError, e:
1993                raise service_error(
1994                        "Cannot create config directory: %s" % e)
1995            try:
1996                f = open("%s/hosts" % configdir, "w")
1997                f.write('\n'.join(hosts))
1998                f.close()
1999            except IOError, e:
2000                raise service_error(service_error.internal, 
2001                        "Cannot write hosts file: %s" % e)
2002            try:
2003                copy_file("%s" % gw_pubkey, "%s/%s" % \
2004                        (configdir, gw_pubkey_base))
2005                copy_file("%s" % gw_secretkey, "%s/%s" % \
2006                        (configdir, gw_secretkey_base))
2007            except IOError, e:
2008                raise service_error(service_error.internal, 
2009                        "Cannot copy keyfiles: %s" % e)
2010
2011            # Allow the individual testbeds to access the configuration files.
2012            for tb in tbparams.keys():
2013                asignee = tbparams[tb]['allocID']['fedid']
2014                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2015                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2016
2017            self.add_portals(top, topo, eid, master, tbparams, ip_allocator)
2018            # Now get access to the dynamic testbeds
2019            for k, t in topo.items():
2020                if not t.get_attribute('dynamic'):
2021                    continue
2022                tb = t.get_attribute('testbed')
2023                if tb: 
2024                    self.get_access(tb, None, user, tbparams, master, 
2025                            export_project, access_user)
2026                    tbparams[k] = tbparams[tb]
2027                    del tbparams[tb]
2028                    allocated[k] = 1
2029                else:
2030                    raise service_error(service_error.internal, 
2031                            "Dynamic allocation from no testbed!?")
2032
2033            self.wrangle_software(expid, top, topo, tbparams)
2034
2035            vtopo = topdl.topology_to_vtopo(top)
2036            vis = self.genviz(vtopo)
2037
2038            # save federant information
2039            for k in allocated.keys():
2040                tbparams[k]['federant'] = {
2041                        'name': [ { 'localname' : eid} ],
2042                        'allocID' : tbparams[k]['allocID'],
2043                        'master' : k == master,
2044                        'uri': tbparams[k]['uri'],
2045                    }
2046                if tbparams[k].has_key('emulab'):
2047                        tbparams[k]['federant']['emulab'] = \
2048                                tbparams[k]['emulab']
2049
2050            self.state_lock.acquire()
2051            self.state[eid]['vtopo'] = vtopo
2052            self.state[eid]['vis'] = vis
2053            self.state[expid]['federant'] = \
2054                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2055                        if tbparams[tb].has_key('federant') ]
2056            if self.state_filename: 
2057                self.write_state()
2058            self.state_lock.release()
2059        except service_error, e:
2060            # If something goes wrong in the parse (usually an access error)
2061            # clear the placeholder state.  From here on out the code delays
2062            # exceptions.  Failing at this point returns a fault to the remote
2063            # caller.
2064
2065            self.state_lock.acquire()
2066            del self.state[eid]
2067            del self.state[expid]
2068            if self.state_filename: self.write_state()
2069            self.state_lock.release()
2070            raise e
2071
2072
2073        # Start the background swapper and return the starting state.  From
2074        # here on out, the state will stick around a while.
2075
2076        # Let users touch the state
2077        self.auth.set_attribute(fid, expid)
2078        self.auth.set_attribute(expid, expid)
2079        # Override fedids can manipulate state as well
2080        for o in self.overrides:
2081            self.auth.set_attribute(o, expid)
2082
2083        # Create a logger that logs to the experiment's state object as well as
2084        # to the main log file.
2085        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2086        alloc_collector = self.list_log(self.state[eid]['log'])
2087        h = logging.StreamHandler(alloc_collector)
2088        # XXX: there should be a global one of these rather than repeating the
2089        # code.
2090        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2091                    '%d %b %y %H:%M:%S'))
2092        alloc_log.addHandler(h)
2093       
2094        # XXX
2095        url_base = 'https://users.isi.deterlab.net:23235'
2096        attrs = [ 
2097                {
2098                    'attribute': 'ssh_pubkey', 
2099                    'value': '%s/%s/config/%s' % \
2100                            (url_base, expid, gw_pubkey_base)
2101                },
2102                {
2103                    'attribute': 'ssh_secretkey', 
2104                    'value': '%s/%s/config/%s' % \
2105                            (url_base, expid, gw_secretkey_base)
2106                },
2107                {
2108                    'attribute': 'hosts', 
2109                    'value': '%s/%s/config/hosts' % \
2110                            (url_base, expid)
2111                },
2112                {
2113                    'attribute': 'experiment_name',
2114                    'value': eid,
2115                },
2116            ]
2117
2118        # Start a thread to do the resource allocation
2119        t  = Thread(target=self.allocate_resources,
2120                args=(allocated, master, eid, expid, expcert, tbparams, 
2121                    topo, tmpdir, alloc_log, alloc_collector, attrs),
2122                name=eid)
2123        t.start()
2124
2125        rv = {
2126                'experimentID': [
2127                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2128                ],
2129                'experimentStatus': 'starting',
2130                'experimentAccess': { 'X509' : expcert }
2131            }
2132
2133        return rv
2134   
2135    def get_experiment_fedid(self, key):
2136        """
2137        find the fedid associated with the localname key in the state database.
2138        """
2139
2140        rv = None
2141        self.state_lock.acquire()
2142        if self.state.has_key(key):
2143            if isinstance(self.state[key], dict):
2144                try:
2145                    kl = [ f['fedid'] for f in \
2146                            self.state[key]['experimentID']\
2147                                if f.has_key('fedid') ]
2148                except KeyError:
2149                    self.state_lock.release()
2150                    raise service_error(service_error.internal, 
2151                            "No fedid for experiment %s when getting "+\
2152                                    "fedid(!?)" % key)
2153                if len(kl) == 1:
2154                    rv = kl[0]
2155                else:
2156                    self.state_lock.release()
2157                    raise service_error(service_error.internal, 
2158                            "multiple fedids for experiment %s when " +\
2159                                    "getting fedid(!?)" % key)
2160            else:
2161                self.state_lock.release()
2162                raise service_error(service_error.internal, 
2163                        "Unexpected state for %s" % key)
2164        self.state_lock.release()
2165        return rv
2166
2167    def check_experiment_access(self, fid, key):
2168        """
2169        Confirm that the fid has access to the experiment.  Though a request
2170        may be made in terms of a local name, the access attribute is always
2171        the experiment's fedid.
2172        """
2173        if not isinstance(key, fedid):
2174            key = self.get_experiment_fedid(key)
2175
2176        if self.auth.check_attribute(fid, key):
2177            return True
2178        else:
2179            raise service_error(service_error.access, "Access Denied")
2180
2181
2182    def get_handler(self, path, fid):
2183        if self.auth.check_attribute(fid, path):
2184            return ("%s/%s" % (self.repodir, path), "application/binary")
2185        else:
2186            return (None, None)
2187
2188    def get_vtopo(self, req, fid):
2189        """
2190        Return the stored virtual topology for this experiment
2191        """
2192        rv = None
2193        state = None
2194
2195        req = req.get('VtopoRequestBody', None)
2196        if not req:
2197            raise service_error(service_error.req,
2198                    "Bad request format (no VtopoRequestBody)")
2199        exp = req.get('experiment', None)
2200        if exp:
2201            if exp.has_key('fedid'):
2202                key = exp['fedid']
2203                keytype = "fedid"
2204            elif exp.has_key('localname'):
2205                key = exp['localname']
2206                keytype = "localname"
2207            else:
2208                raise service_error(service_error.req, "Unknown lookup type")
2209        else:
2210            raise service_error(service_error.req, "No request?")
2211
2212        self.check_experiment_access(fid, key)
2213
2214        self.state_lock.acquire()
2215        if self.state.has_key(key):
2216            if self.state[key].has_key('vtopo'):
2217                rv = { 'experiment' : {keytype: key },\
2218                        'vtopo': self.state[key]['vtopo'],\
2219                    }
2220            else:
2221                state = self.state[key]['experimentStatus']
2222        self.state_lock.release()
2223
2224        if rv: return rv
2225        else: 
2226            if state:
2227                raise service_error(service_error.partial, 
2228                        "Not ready: %s" % state)
2229            else:
2230                raise service_error(service_error.req, "No such experiment")
2231
2232    def get_vis(self, req, fid):
2233        """
2234        Return the stored visualization for this experiment
2235        """
2236        rv = None
2237        state = None
2238
2239        req = req.get('VisRequestBody', None)
2240        if not req:
2241            raise service_error(service_error.req,
2242                    "Bad request format (no VisRequestBody)")
2243        exp = req.get('experiment', None)
2244        if exp:
2245            if exp.has_key('fedid'):
2246                key = exp['fedid']
2247                keytype = "fedid"
2248            elif exp.has_key('localname'):
2249                key = exp['localname']
2250                keytype = "localname"
2251            else:
2252                raise service_error(service_error.req, "Unknown lookup type")
2253        else:
2254            raise service_error(service_error.req, "No request?")
2255
2256        self.check_experiment_access(fid, key)
2257
2258        self.state_lock.acquire()
2259        if self.state.has_key(key):
2260            if self.state[key].has_key('vis'):
2261                rv =  { 'experiment' : {keytype: key },\
2262                        'vis': self.state[key]['vis'],\
2263                        }
2264            else:
2265                state = self.state[key]['experimentStatus']
2266        self.state_lock.release()
2267
2268        if rv: return rv
2269        else:
2270            if state:
2271                raise service_error(service_error.partial, 
2272                        "Not ready: %s" % state)
2273            else:
2274                raise service_error(service_error.req, "No such experiment")
2275
2276    def clean_info_response(self, rv):
2277        """
2278        Remove the information in the experiment's state object that is not in
2279        the info response.
2280        """
2281        # Remove the owner info (should always be there, but...)
2282        if rv.has_key('owner'): del rv['owner']
2283
2284        # Convert the log into the allocationLog parameter and remove the
2285        # log entry (with defensive programming)
2286        if rv.has_key('log'):
2287            rv['allocationLog'] = "".join(rv['log'])
2288            del rv['log']
2289        else:
2290            rv['allocationLog'] = ""
2291
2292        if rv['experimentStatus'] != 'active':
2293            if rv.has_key('federant'): del rv['federant']
2294        else:
2295            # remove the allocationID and uri info from each federant
2296            for f in rv.get('federant', []):
2297                if f.has_key('allocID'): del f['allocID']
2298                if f.has_key('uri'): del f['uri']
2299        return rv
2300
2301    def get_info(self, req, fid):
2302        """
2303        Return all the stored info about this experiment
2304        """
2305        rv = None
2306
2307        req = req.get('InfoRequestBody', None)
2308        if not req:
2309            raise service_error(service_error.req,
2310                    "Bad request format (no InfoRequestBody)")
2311        exp = req.get('experiment', None)
2312        if exp:
2313            if exp.has_key('fedid'):
2314                key = exp['fedid']
2315                keytype = "fedid"
2316            elif exp.has_key('localname'):
2317                key = exp['localname']
2318                keytype = "localname"
2319            else:
2320                raise service_error(service_error.req, "Unknown lookup type")
2321        else:
2322            raise service_error(service_error.req, "No request?")
2323
2324        self.check_experiment_access(fid, key)
2325
2326        # The state may be massaged by the service function that called
2327        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2328        # state.
2329        self.state_lock.acquire()
2330        if self.state.has_key(key):
2331            rv = copy.deepcopy(self.state[key])
2332        self.state_lock.release()
2333
2334        if rv:
2335            return self.clean_info_response(rv)
2336        else:
2337            raise service_error(service_error.req, "No such experiment")
2338
2339    def get_multi_info(self, req, fid):
2340        """
2341        Return all the stored info that this fedid can access
2342        """
2343        rv = { 'info': [ ] }
2344
2345        self.state_lock.acquire()
2346        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2347            try:
2348                self.check_experiment_access(fid, key)
2349            except service_error, e:
2350                if e.code == service_error.access:
2351                    continue
2352                else:
2353                    self.state_lock.release()
2354                    raise e
2355
2356            if self.state.has_key(key):
2357                e = copy.deepcopy(self.state[key])
2358                e = self.clean_info_response(e)
2359                rv['info'].append(e)
2360        self.state_lock.release()
2361        return rv
2362
2363    def terminate_experiment(self, req, fid):
2364        """
2365        Swap this experiment out on the federants and delete the shared
2366        information
2367        """
2368        tbparams = { }
2369        req = req.get('TerminateRequestBody', None)
2370        if not req:
2371            raise service_error(service_error.req,
2372                    "Bad request format (no TerminateRequestBody)")
2373        force = req.get('force', False)
2374        exp = req.get('experiment', None)
2375        if exp:
2376            if exp.has_key('fedid'):
2377                key = exp['fedid']
2378                keytype = "fedid"
2379            elif exp.has_key('localname'):
2380                key = exp['localname']
2381                keytype = "localname"
2382            else:
2383                raise service_error(service_error.req, "Unknown lookup type")
2384        else:
2385            raise service_error(service_error.req, "No request?")
2386
2387        self.check_experiment_access(fid, key)
2388
2389        dealloc_list = [ ]
2390
2391
2392        # Create a logger that logs to the dealloc_list as well as to the main
2393        # log file.
2394        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2395        h = logging.StreamHandler(self.list_log(dealloc_list))
2396        # XXX: there should be a global one of these rather than repeating the
2397        # code.
2398        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2399                    '%d %b %y %H:%M:%S'))
2400        dealloc_log.addHandler(h)
2401
2402        self.state_lock.acquire()
2403        fed_exp = self.state.get(key, None)
2404
2405        if fed_exp:
2406            # This branch of the conditional holds the lock to generate a
2407            # consistent temporary tbparams variable to deallocate experiments.
2408            # It releases the lock to do the deallocations and reacquires it to
2409            # remove the experiment state when the termination is complete.
2410
2411            # First make sure that the experiment creation is complete.
2412            status = fed_exp.get('experimentStatus', None)
2413
2414            if status:
2415                if status in ('starting', 'terminating'):
2416                    if not force:
2417                        self.state_lock.release()
2418                        raise service_error(service_error.partial, 
2419                                'Experiment still being created or destroyed')
2420                    else:
2421                        self.log.warning('Experiment in %s state ' % status + \
2422                                'being terminated by force.')
2423            else:
2424                # No status??? trouble
2425                self.state_lock.release()
2426                raise service_error(service_error.internal,
2427                        "Experiment has no status!?")
2428
2429            ids = []
2430            #  experimentID is a list of dicts that are self-describing
2431            #  identifiers.  This finds all the fedids and localnames - the
2432            #  keys of self.state - and puts them into ids.
2433            for id in fed_exp.get('experimentID', []):
2434                if id.has_key('fedid'): ids.append(id['fedid'])
2435                if id.has_key('localname'): ids.append(id['localname'])
2436
2437            # Collect the allocation/segment ids into a dict keyed by the fedid
2438            # of the allocation (or a monotonically increasing integer) that
2439            # contains a tuple of uri, aid (which is a dict...)
2440            for i, fed in enumerate(fed_exp.get('federant', [])):
2441                try:
2442                    uri = fed['uri']
2443                    aid = fed['allocID']
2444                    k = fed['allocID'].get('fedid', i)
2445                except KeyError, e:
2446                    continue
2447                tbparams[k] = (uri, aid)
2448            fed_exp['experimentStatus'] = 'terminating'
2449            if self.state_filename: self.write_state()
2450            self.state_lock.release()
2451
2452            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2453            # then completes, so we can't wait if nothing starts.  So, no
2454            # tbparams, no start.
2455            if len(tbparams) > 0:
2456                thread_pool = self.thread_pool(self.nthreads)
2457                for k in tbparams.keys():
2458                    # Create and start a thread to stop the segment
2459                    thread_pool.wait_for_slot()
2460                    uri, aid = tbparams[k]
2461                    t  = self.pooled_thread(\
2462                            target=self.terminate_segment(log=dealloc_log,
2463                                testbed=uri,
2464                                cert_file=self.cert_file, 
2465                                cert_pwd=self.cert_pwd,
2466                                trusted_certs=self.trusted_certs,
2467                                caller=self.call_TerminateSegment),
2468                            args=(uri, aid), name=k,
2469                            pdata=thread_pool, trace_file=self.trace_file)
2470                    t.start()
2471                # Wait for completions
2472                thread_pool.wait_for_all_done()
2473
2474            # release the allocations (failed experiments have done this
2475            # already, and starting experiments may be in odd states, so we
2476            # ignore errors releasing those allocations
2477            try: 
2478                for k in tbparams.keys():
2479                    # This releases access by uri
2480                    uri, aid = tbparams[k]
2481                    self.release_access(None, aid, uri=uri)
2482            except service_error, e:
2483                if status != 'failed' and not force:
2484                    raise e
2485
2486            # Remove the terminated experiment
2487            self.state_lock.acquire()
2488            for id in ids:
2489                if self.state.has_key(id): del self.state[id]
2490
2491            if self.state_filename: self.write_state()
2492            self.state_lock.release()
2493
2494            return { 
2495                    'experiment': exp , 
2496                    'deallocationLog': "".join(dealloc_list),
2497                    }
2498        else:
2499            # Don't forget to release the lock
2500            self.state_lock.release()
2501            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.