source: fedd/federation/experiment_control.py @ 8c9933c

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 8c9933c was 69692a9, checked in by Ted Faber <faber@…>, 15 years ago

Looks like Dragon is being called correctly. Internals remain a bit messy.

  • Property mode set to 100644
File size: 85.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221
222        self.exp_stem = "fed-stem"
223        self.log = logging.getLogger("fedd.experiment_control")
224        set_log_level(config, "experiment_control", self.log)
225        self.muxmax = 2
226        self.nthreads = 2
227        self.randomize_experiments = False
228
229        self.splitter = None
230        self.ssh_keygen = "/usr/bin/ssh-keygen"
231        self.ssh_identity_file = None
232
233
234        self.debug = config.getboolean("experiment_control", "create_debug")
235        self.cleanup = not config.getboolean("experiment_control", 
236                "leave_tmpfiles")
237        self.state_filename = config.get("experiment_control", 
238                "experiment_state")
239        self.splitter_url = config.get("experiment_control", "splitter_uri")
240        self.fedkit = parse_tarfile_list(\
241                config.get("experiment_control", "fedkit"))
242        self.gatewaykit = parse_tarfile_list(\
243                config.get("experiment_control", "gatewaykit"))
244        accessdb_file = config.get("experiment_control", "accessdb")
245
246        self.ssh_pubkey_file = config.get("experiment_control", 
247                "ssh_pubkey_file")
248        self.ssh_privkey_file = config.get("experiment_control",
249                "ssh_privkey_file")
250        # NB for internal master/slave ops, not experiment setup
251        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
252
253        self.overrides = set([])
254        ovr = config.get('experiment_control', 'overrides')
255        if ovr:
256            for o in ovr.split(","):
257                o = o.strip()
258                if o.startswith('fedid:'): o = o[len('fedid:'):]
259                self.overrides.add(fedid(hexstr=o))
260
261        self.state = { }
262        self.state_lock = Lock()
263        self.tclsh = "/usr/local/bin/otclsh"
264        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
265                config.get("experiment_control", "tcl_splitter",
266                        "/usr/testbed/lib/ns2ir/parse.tcl")
267        mapdb_file = config.get("experiment_control", "mapdb")
268        self.trace_file = sys.stderr
269
270        self.def_expstart = \
271                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
272                "/tmp/federate";
273        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
274                "FEDDIR/hosts";
275        self.def_gwstart = \
276                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
277                "/tmp/bridge.log";
278        self.def_mgwstart = \
279                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
280                "/tmp/bridge.log";
281        self.def_gwimage = "FBSD61-TUNNEL2";
282        self.def_gwtype = "pc";
283        self.local_access = { }
284
285        if auth:
286            self.auth = auth
287        else:
288            self.log.error(\
289                    "[access]: No authorizer initialized, creating local one.")
290            auth = authorizer()
291
292
293        if self.ssh_pubkey_file:
294            try:
295                f = open(self.ssh_pubkey_file, 'r')
296                self.ssh_pubkey = f.read()
297                f.close()
298            except IOError:
299                raise service_error(service_error.internal,
300                        "Cannot read sshpubkey")
301        else:
302            raise service_error(service_error.internal, 
303                    "No SSH public key file?")
304
305        if not self.ssh_privkey_file:
306            raise service_error(service_error.internal, 
307                    "No SSH public key file?")
308
309
310        if mapdb_file:
311            self.read_mapdb(mapdb_file)
312        else:
313            self.log.warn("[experiment_control] No testbed map, using defaults")
314            self.tbmap = { 
315                    'deter':'https://users.isi.deterlab.net:23235',
316                    'emulab':'https://users.isi.deterlab.net:23236',
317                    'ucb':'https://users.isi.deterlab.net:23237',
318                    }
319
320        if accessdb_file:
321                self.read_accessdb(accessdb_file)
322        else:
323            raise service_error(service_error.internal,
324                    "No accessdb specified in config")
325
326        # Grab saved state.  OK to do this w/o locking because it's read only
327        # and only one thread should be in existence that can see self.state at
328        # this point.
329        if self.state_filename:
330            self.read_state()
331
332        # Dispatch tables
333        self.soap_services = {\
334                'Create': soap_handler('Create', self.create_experiment),
335                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
336                'Vis': soap_handler('Vis', self.get_vis),
337                'Info': soap_handler('Info', self.get_info),
338                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
339                'Terminate': soap_handler('Terminate', 
340                    self.terminate_experiment),
341        }
342
343        self.xmlrpc_services = {\
344                'Create': xmlrpc_handler('Create', self.create_experiment),
345                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
346                'Vis': xmlrpc_handler('Vis', self.get_vis),
347                'Info': xmlrpc_handler('Info', self.get_info),
348                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
349                'Terminate': xmlrpc_handler('Terminate',
350                    self.terminate_experiment),
351        }
352
353    # Call while holding self.state_lock
354    def write_state(self):
355        """
356        Write a new copy of experiment state after copying the existing state
357        to a backup.
358
359        State format is a simple pickling of the state dictionary.
360        """
361        if os.access(self.state_filename, os.W_OK):
362            copy_file(self.state_filename, \
363                    "%s.bak" % self.state_filename)
364        try:
365            f = open(self.state_filename, 'w')
366            pickle.dump(self.state, f)
367        except IOError, e:
368            self.log.error("Can't write file %s: %s" % \
369                    (self.state_filename, e))
370        except pickle.PicklingError, e:
371            self.log.error("Pickling problem: %s" % e)
372        except TypeError, e:
373            self.log.error("Pickling problem (TypeError): %s" % e)
374
375    # Call while holding self.state_lock
376    def read_state(self):
377        """
378        Read a new copy of experiment state.  Old state is overwritten.
379
380        State format is a simple pickling of the state dictionary.
381        """
382       
383        def get_experiment_id(state):
384            """
385            Pull the fedid experimentID out of the saved state.  This is kind
386            of a gross walk through the dict.
387            """
388
389            if state.has_key('experimentID'):
390                for e in state['experimentID']:
391                    if e.has_key('fedid'):
392                        return e['fedid']
393                else:
394                    return None
395            else:
396                return None
397
398        def get_alloc_ids(state):
399            """
400            Pull the fedids of the identifiers of each allocation from the
401            state.  Again, a dict dive that's best isolated.
402            """
403
404            return [ f['allocID']['fedid'] 
405                    for f in state.get('federant',[]) \
406                        if f.has_key('allocID') and \
407                            f['allocID'].has_key('fedid')]
408
409
410        try:
411            f = open(self.state_filename, "r")
412            self.state = pickle.load(f)
413            self.log.debug("[read_state]: Read state from %s" % \
414                    self.state_filename)
415        except IOError, e:
416            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
417                    % (self.state_filename, e))
418        except pickle.UnpicklingError, e:
419            self.log.warning(("[read_state]: No saved state: " + \
420                    "Unpickling failed: %s") % e)
421       
422        for s in self.state.values():
423            try:
424
425                eid = get_experiment_id(s)
426                if eid : 
427                    # Give the owner rights to the experiment
428                    self.auth.set_attribute(s['owner'], eid)
429                    # And holders of the eid as well
430                    self.auth.set_attribute(eid, eid)
431                    # allow overrides to control experiments as well
432                    for o in self.overrides:
433                        self.auth.set_attribute(o, eid)
434                    # Set permissions to allow reading of the software repo, if
435                    # any, as well.
436                    for a in get_alloc_ids(s):
437                        self.auth.set_attribute(a, 'repo/%s' % eid)
438                else:
439                    raise KeyError("No experiment id")
440            except KeyError, e:
441                self.log.warning("[read_state]: State ownership or identity " +\
442                        "misformatted in %s: %s" % (self.state_filename, e))
443
444
445    def read_accessdb(self, accessdb_file):
446        """
447        Read the mapping from fedids that can create experiments to their name
448        in the 3-level access namespace.  All will be asserted from this
449        testbed and can include the local username and porject that will be
450        asserted on their behalf by this fedd.  Each fedid is also added to the
451        authorization system with the "create" attribute.
452        """
453        self.accessdb = {}
454        # These are the regexps for parsing the db
455        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
456        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
457                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
458        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
459                "\s*->\s*(" + name_expr + ")\s*$")
460        lineno = 0
461
462        # Parse the mappings and store in self.authdb, a dict of
463        # fedid -> (proj, user)
464        try:
465            f = open(accessdb_file, "r")
466            for line in f:
467                lineno += 1
468                line = line.strip()
469                if len(line) == 0 or line.startswith('#'):
470                    continue
471                m = project_line.match(line)
472                if m:
473                    fid = fedid(hexstr=m.group(1))
474                    project, user = m.group(2,3)
475                    if not self.accessdb.has_key(fid):
476                        self.accessdb[fid] = []
477                    self.accessdb[fid].append((project, user))
478                    continue
479
480                m = user_line.match(line)
481                if m:
482                    fid = fedid(hexstr=m.group(1))
483                    project = None
484                    user = m.group(2)
485                    if not self.accessdb.has_key(fid):
486                        self.accessdb[fid] = []
487                    self.accessdb[fid].append((project, user))
488                    continue
489                self.log.warn("[experiment_control] Error parsing access " +\
490                        "db %s at line %d" %  (accessdb_file, lineno))
491        except IOError:
492            raise service_error(service_error.internal,
493                    "Error opening/reading %s as experiment " +\
494                            "control accessdb" %  accessdb_file)
495        f.close()
496
497        # Initialize the authorization attributes
498        for fid in self.accessdb.keys():
499            self.auth.set_attribute(fid, 'create')
500
501    def read_mapdb(self, file):
502        """
503        Read a simple colon separated list of mappings for the
504        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
505        """
506
507        self.tbmap = { }
508        lineno =0
509        try:
510            f = open(file, "r")
511            for line in f:
512                lineno += 1
513                line = line.strip()
514                if line.startswith('#') or len(line) == 0:
515                    continue
516                try:
517                    label, url = line.split(':', 1)
518                    self.tbmap[label] = url
519                except ValueError, e:
520                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
521                            "map db: %s %s" % (lineno, line, e))
522        except IOError, e:
523            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
524                    "open %s: %s" % (file, e))
525        f.close()
526       
527    def generate_ssh_keys(self, dest, type="rsa" ):
528        """
529        Generate a set of keys for the gateways to use to talk.
530
531        Keys are of type type and are stored in the required dest file.
532        """
533        valid_types = ("rsa", "dsa")
534        t = type.lower();
535        if t not in valid_types: raise ValueError
536        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
537
538        try:
539            trace = open("/dev/null", "w")
540        except IOError:
541            raise service_error(service_error.internal,
542                    "Cannot open /dev/null??");
543
544        # May raise CalledProcessError
545        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
546        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
547        if rv != 0:
548            raise service_error(service_error.internal, 
549                    "Cannot generate nonce ssh keys.  %s return code %d" \
550                            % (self.ssh_keygen, rv))
551
552    def gentopo(self, str):
553        """
554        Generate the topology dtat structure from the splitter's XML
555        representation of it.
556
557        The topology XML looks like:
558            <experiment>
559                <nodes>
560                    <node><vname></vname><ips>ip1:ip2</ips></node>
561                </nodes>
562                <lans>
563                    <lan>
564                        <vname></vname><vnode></vnode><ip></ip>
565                        <bandwidth></bandwidth><member>node:port</member>
566                    </lan>
567                </lans>
568        """
569        class topo_parse:
570            """
571            Parse the topology XML and create the dats structure.
572            """
573            def __init__(self):
574                # Typing of the subelements for data conversion
575                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
576                self.int_subelements = ( 'bandwidth',)
577                self.float_subelements = ( 'delay',)
578                # The final data structure
579                self.nodes = [ ]
580                self.lans =  [ ]
581                self.topo = { \
582                        'node': self.nodes,\
583                        'lan' : self.lans,\
584                    }
585                self.element = { }  # Current element being created
586                self.chars = ""     # Last text seen
587
588            def end_element(self, name):
589                # After each sub element the contents is added to the current
590                # element or to the appropriate list.
591                if name == 'node':
592                    self.nodes.append(self.element)
593                    self.element = { }
594                elif name == 'lan':
595                    self.lans.append(self.element)
596                    self.element = { }
597                elif name in self.str_subelements:
598                    self.element[name] = self.chars
599                    self.chars = ""
600                elif name in self.int_subelements:
601                    self.element[name] = int(self.chars)
602                    self.chars = ""
603                elif name in self.float_subelements:
604                    self.element[name] = float(self.chars)
605                    self.chars = ""
606
607            def found_chars(self, data):
608                self.chars += data.rstrip()
609
610
611        tp = topo_parse();
612        parser = xml.parsers.expat.ParserCreate()
613        parser.EndElementHandler = tp.end_element
614        parser.CharacterDataHandler = tp.found_chars
615
616        parser.Parse(str)
617
618        return tp.topo
619       
620
621    def genviz(self, topo):
622        """
623        Generate the visualization the virtual topology
624        """
625
626        neato = "/usr/local/bin/neato"
627        # These are used to parse neato output and to create the visualization
628        # file.
629        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
630        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
631                "%s</type></node>"
632
633        try:
634            # Node names
635            nodes = [ n['vname'] for n in topo['node'] ]
636            topo_lans = topo['lan']
637        except KeyError, e:
638            raise service_error(service_error.internal, "Bad topology: %s" %e)
639
640        lans = { }
641        links = { }
642
643        # Walk through the virtual topology, organizing the connections into
644        # 2-node connections (links) and more-than-2-node connections (lans).
645        # When a lan is created, it's added to the list of nodes (there's a
646        # node in the visualization for the lan).
647        for l in topo_lans:
648            if links.has_key(l['vname']):
649                if len(links[l['vname']]) < 2:
650                    links[l['vname']].append(l['vnode'])
651                else:
652                    nodes.append(l['vname'])
653                    lans[l['vname']] = links[l['vname']]
654                    del links[l['vname']]
655                    lans[l['vname']].append(l['vnode'])
656            elif lans.has_key(l['vname']):
657                lans[l['vname']].append(l['vnode'])
658            else:
659                links[l['vname']] = [ l['vnode'] ]
660
661
662        # Open up a temporary file for dot to turn into a visualization
663        try:
664            df, dotname = tempfile.mkstemp()
665            dotfile = os.fdopen(df, 'w')
666        except IOError:
667            raise service_error(service_error.internal,
668                    "Failed to open file in genviz")
669
670        try:
671            dnull = open('/dev/null', 'w')
672        except IOError:
673            service_error(service_error.internal,
674                    "Failed to open /dev/null in genviz")
675
676        # Generate a dot/neato input file from the links, nodes and lans
677        try:
678            print >>dotfile, "graph G {"
679            for n in nodes:
680                print >>dotfile, '\t"%s"' % n
681            for l in links.keys():
682                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
683            for l in lans.keys():
684                for n in lans[l]:
685                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
686            print >>dotfile, "}"
687            dotfile.close()
688        except TypeError:
689            raise service_error(service_error.internal,
690                    "Single endpoint link in vtopo")
691        except IOError:
692            raise service_error(service_error.internal, "Cannot write dot file")
693
694        # Use dot to create a visualization
695        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
696                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
697                close_fds=True)
698        dnull.close()
699
700        # Translate dot to vis format
701        vis_nodes = [ ]
702        vis = { 'node': vis_nodes }
703        for line in dot.stdout:
704            m = vis_re.match(line)
705            if m:
706                vn = m.group(1)
707                vis_node = {'name': vn, \
708                        'x': float(m.group(2)),\
709                        'y' : float(m.group(3)),\
710                    }
711                if vn in links.keys() or vn in lans.keys():
712                    vis_node['type'] = 'lan'
713                else:
714                    vis_node['type'] = 'node'
715                vis_nodes.append(vis_node)
716        rv = dot.wait()
717
718        os.remove(dotname)
719        if rv == 0 : return vis
720        else: return None
721
722    def get_access(self, tb, nodes, user, tbparam, master, export_project,
723            access_user):
724        """
725        Get access to testbed through fedd and set the parameters for that tb
726        """
727        uri = self.tbmap.get(tb, None)
728        if not uri:
729            raise service_error(serice_error.server_config, 
730                    "Unknown testbed: %s" % tb)
731
732        # currently this lumps all users into one service access group
733        service_keys = [ a for u in user \
734                for a in u.get('access', []) \
735                    if a.has_key('sshPubkey')]
736
737        if len(service_keys) == 0:
738            raise service_error(service_error.req, 
739                    "Must have at least one SSH pubkey for services")
740
741
742        for p, u in access_user:
743            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
744                    "to %s") %  ((p or "None"), u, uri))
745
746            if p:
747                # Request with user and project specified
748                req = {\
749                        'destinationTestbed' : { 'uri' : uri },
750                        'project': { 
751                            'name': {'localname': p},
752                            'user': [ {'userID': { 'localname': u } } ],
753                            },
754                        'user':  user,
755                        'allocID' : { 'localname': 'test' },
756                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
757                        'serviceAccess' : service_keys
758                    }
759            else:
760                # Request with only user specified
761                req = {\
762                        'destinationTestbed' : { 'uri' : uri },
763                        'user':  [ {'userID': { 'localname': u } } ],
764                        'allocID' : { 'localname': 'test' },
765                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
766                        'serviceAccess' : service_keys
767                    }
768
769            if tb == master:
770                # NB, the export_project parameter is a dict that includes
771                # the type
772                req['exportProject'] = export_project
773
774            # node resources if any
775            if nodes != None and len(nodes) > 0:
776                rnodes = [ ]
777                for n in nodes:
778                    rn = { }
779                    image, hw, count = n.split(":")
780                    if image: rn['image'] = [ image ]
781                    if hw: rn['hardware'] = [ hw ]
782                    if count and int(count) >0 : rn['count'] = int(count)
783                    rnodes.append(rn)
784                req['resources']= { }
785                req['resources']['node'] = rnodes
786
787            try:
788                if self.local_access.has_key(uri):
789                    # Local access call
790                    req = { 'RequestAccessRequestBody' : req }
791                    r = self.local_access[uri].RequestAccess(req, 
792                            fedid(file=self.cert_file))
793                    r = { 'RequestAccessResponseBody' : r }
794                else:
795                    r = self.call_RequestAccess(uri, req, 
796                            self.cert_file, self.cert_pwd, self.trusted_certs)
797            except service_error, e:
798                if e.code == service_error.access:
799                    self.log.debug("[get_access] Access denied")
800                    r = None
801                    continue
802                else:
803                    raise e
804
805            if r.has_key('RequestAccessResponseBody'):
806                # Through to here we have a valid response, not a fault.
807                # Access denied is a fault, so something better or worse than
808                # access denied has happened.
809                r = r['RequestAccessResponseBody']
810                self.log.debug("[get_access] Access granted")
811                break
812            else:
813                raise service_error(service_error.protocol,
814                        "Bad proxy response")
815       
816        if not r:
817            raise service_error(service_error.access, 
818                    "Access denied by %s (%s)" % (tb, uri))
819
820        if r.has_key('emulab'):
821            e = r['emulab']
822            p = e['project']
823            tbparam[tb] = { 
824                    "boss": e['boss'],
825                    "host": e['ops'],
826                    "domain": e['domain'],
827                    "fs": e['fileServer'],
828                    "eventserver": e['eventServer'],
829                    "project": unpack_id(p['name']),
830                    "emulab" : e,
831                    "allocID" : r['allocID'],
832                    "uri": uri,
833                    }
834            # Make the testbed name be the label the user applied
835            p['testbed'] = {'localname': tb }
836
837            for u in p['user']:
838                role = u.get('role', None)
839                if role == 'experimentCreation':
840                    tbparam[tb]['user'] = unpack_id(u['userID'])
841                    break
842            else:
843                raise service_error(service_error.internal, 
844                        "No createExperimentUser from %s" %tb)
845            # Add attributes to parameter space.  We don't allow attributes to
846            # overlay any parameters already installed.
847            for a in e['fedAttr']:
848                try:
849                    if a['attribute'] and \
850                            isinstance(a['attribute'], basestring)\
851                            and not tbparam[tb].has_key(a['attribute'].lower()):
852                        tbparam[tb][a['attribute'].lower()] = a['value']
853                except KeyError:
854                    self.log.error("Bad attribute in response: %s" % a)
855        else:
856            tbparam[tb] = { 
857                "allocID" : r['allocID'],
858                "uri": uri,
859            }
860
861       
862    def release_access(self, tb, aid, uri=None):
863        """
864        Release access to testbed through fedd
865        """
866
867        if not uri:
868            uri = self.tbmap.get(tb, None)
869        if not uri:
870            raise service_error(service_error.server_config, 
871                    "Unknown testbed: %s" % tb)
872
873        if self.local_access.has_key(uri):
874            resp = self.local_access[uri].ReleaseAccess(\
875                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
876                    fedid(file=self.cert_file))
877            resp = { 'ReleaseAccessResponseBody': resp } 
878        else:
879            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
880                    self.cert_file, self.cert_pwd, self.trusted_certs)
881
882        # better error coding
883
884    def remote_splitter(self, uri, desc, master):
885
886        req = {
887                'description' : { 'ns2description': desc },
888                'master': master,
889                'include_fedkit': bool(self.fedkit),
890                'include_gatewaykit': bool(self.gatewaykit)
891            }
892
893        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
894                self.trusted_certs)
895
896        if r.has_key('Ns2SplitResponseBody'):
897            r = r['Ns2SplitResponseBody']
898            if r.has_key('output'):
899                return r['output'].splitlines()
900            else:
901                raise service_error(service_error.protocol, 
902                        "Bad splitter response (no output)")
903        else:
904            raise service_error(service_error.protocol, "Bad splitter response")
905
906    class start_segment:
907        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
908                cert_pwd=None, trusted_certs=None, caller=None,
909                log_collector=None):
910            self.log = log
911            self.debug = debug
912            self.cert_file = cert_file
913            self.cert_pwd = cert_pwd
914            self.trusted_certs = None
915            self.caller = caller
916            self.testbed = testbed
917            self.log_collector = log_collector
918            self.response = None
919
920        def __call__(self, uri, aid, topo, master, attrs=None):
921            req = {
922                    'allocID': { 'fedid' : aid }, 
923                    'segmentdescription': { 
924                        'topdldescription': topo.to_dict(),
925                    },
926                    'master': master,
927                }
928            if attrs:
929                req['fedAttr'] = attrs
930
931            try:
932                self.log.debug("Calling StartSegment at %s " % uri)
933                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
934                        self.trusted_certs)
935                if r.has_key('StartSegmentResponseBody'):
936                    lval = r['StartSegmentResponseBody'].get('allocationLog',
937                            None)
938                    if lval and self.log_collector:
939                        for line in  lval.splitlines(True):
940                            self.log_collector.write(line)
941                    self.response = r
942                else:
943                    raise service_error(service_error.internal, 
944                            "Bad response!?: %s" %r)
945                return True
946            except service_error, e:
947                self.log.error("Start segment failed on %s: %s" % \
948                        (self.testbed, e))
949                return False
950
951
952
953    class terminate_segment:
954        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
955                cert_pwd=None, trusted_certs=None, caller=None):
956            self.log = log
957            self.debug = debug
958            self.cert_file = cert_file
959            self.cert_pwd = cert_pwd
960            self.trusted_certs = None
961            self.caller = caller
962            self.testbed = testbed
963
964        def __call__(self, uri, aid ):
965            req = {
966                    'allocID': aid , 
967                }
968            try:
969                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
970                        self.trusted_certs)
971                return True
972            except service_error, e:
973                self.log.error("Terminate segment failed on %s: %s" % \
974                        (self.testbed, e))
975                return False
976   
977
978    def allocate_resources(self, allocated, master, eid, expid, expcert, 
979            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
980            attrs=None):
981        def get_vlan(r):
982            if r.has_key('StartSegmentResponseBody'):
983                srb = r['StartSegmentResponseBody']
984                if srb.has_key('fedAttr'):
985                    for k, v in [ (a['attribute'], a['value']) \
986                            for a in srb['fedAttr']]:
987                        if k == 'vlan': return v
988            return None
989
990        started = { }           # Testbeds where a sub-experiment started
991                                # successfully
992
993        # XXX
994        fail_soft = False
995
996        slaves = [ k for k in allocated.keys() \
997                if k != master and not topo[k].get_attribute('transit')]
998        transit = [ k for k in allocated.keys() \
999                if topo[k].get_attribute('transit')]
1000
1001        log = alloc_log or self.log
1002
1003        thread_pool = self.thread_pool(self.nthreads)
1004        threads = [ ]
1005
1006        for tb in transit:
1007            uri = tbparams[tb]['uri']
1008            if tbparams[tb].has_key('allocID') and \
1009                    tbparams[tb]['allocID'].has_key('fedid'):
1010                aid = tbparams[tb]['allocID']['fedid']
1011            else:
1012                raise service_error(service_error.internal, 
1013                        "No alloc id for testbed %s !?" % tb)
1014
1015            m = re.search('(\d+)', tb)
1016            if m:
1017                to_repl = "unassigned%s" % m.group(1)
1018            else:
1019                raise service_error(service_error.internal, 
1020                        "Bad dynamic allocation name")
1021                break
1022
1023            ss = self.start_segment(log=log, debug=self.debug, 
1024                testbed=master, cert_file=self.cert_file, 
1025                cert_pwd=self.cert_pwd, 
1026                trusted_certs=self.trusted_certs,
1027                caller=self.call_StartSegment,
1028                log_collector=log_collector)
1029            t = self.pooled_thread(
1030                    target=ss,
1031                    args =(uri, aid, topo[tb], False, attrs),
1032                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1033            threads.append(t)
1034            t.start()
1035            # Wait until the this transit node finishes (keep pinging the log,
1036            # though)
1037
1038            mins = 0
1039            while not thread_pool.wait_for_all_done(60.0):
1040                mins += 1
1041                alloc_log.info("Waiting for master (it has been %d mins)" \
1042                        % mins)
1043
1044            if t.rv:
1045                vlan = get_vlan(ss.response)
1046                if vlan is not None:
1047                    for k, t in topo.items():
1048                        for e in t.elements:
1049                            for i in e.interface:
1050                                vl = i.get_attribute('dragon_vlan')
1051                                if vl is not None and vl == to_repl:
1052                                    i.set_attribute('dragon_vlan', vlan)
1053            else:
1054                break
1055            thread_pool.clear()
1056
1057
1058        failed = [ t.getName() for t in threads if not t.rv ]
1059
1060        if len(failed) == 0:
1061            for tb in slaves:
1062                # Create and start a thread to start the segment, and save it
1063                # to get the return value later
1064                thread_pool.wait_for_slot()
1065                uri = self.tbmap.get(tb, None)
1066                if not uri:
1067                    raise service_error(service_error.internal, 
1068                            "Unknown testbed %s !?" % tb)
1069
1070                if tbparams[tb].has_key('allocID') and \
1071                        tbparams[tb]['allocID'].has_key('fedid'):
1072                    aid = tbparams[tb]['allocID']['fedid']
1073                else:
1074                    raise service_error(service_error.internal, 
1075                            "No alloc id for testbed %s !?" % tb)
1076
1077                t  = self.pooled_thread(\
1078                        target=self.start_segment(log=log, debug=self.debug,
1079                            testbed=tb, cert_file=self.cert_file,
1080                            cert_pwd=self.cert_pwd,
1081                            trusted_certs=self.trusted_certs,
1082                            caller=self.call_StartSegment,
1083                            log_collector=log_collector), 
1084                        args=(uri, aid, topo[tb], False, attrs), name=tb,
1085                        pdata=thread_pool, trace_file=self.trace_file)
1086                threads.append(t)
1087                t.start()
1088
1089            # Wait until all finish (keep pinging the log, though)
1090            mins = 0
1091            while not thread_pool.wait_for_all_done(60.0):
1092                mins += 1
1093                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1094                        % mins)
1095
1096            thread_pool.clear()
1097
1098        # If none failed, start the master
1099        failed = [ t.getName() for t in threads if not t.rv ]
1100
1101        if len(failed) == 0:
1102            uri = self.tbmap.get(master, None)
1103            if not uri:
1104                raise service_error(service_error.internal, 
1105                        "Unknown testbed %s !?" % master)
1106
1107            if tbparams[master].has_key('allocID') and \
1108                    tbparams[master]['allocID'].has_key('fedid'):
1109                aid = tbparams[master]['allocID']['fedid']
1110            else:
1111                raise service_error(service_error.internal, 
1112                    "No alloc id for testbed %s !?" % master)
1113            t = self.pooled_thread(
1114                    target=self.start_segment(log=log, debug=self.debug, 
1115                        testbed=master, cert_file=self.cert_file, 
1116                        cert_pwd=self.cert_pwd, 
1117                        trusted_certs=self.trusted_certs,
1118                        caller=self.call_StartSegment,
1119                        log_collector=log_collector),
1120                    args =(uri, aid, topo[master], True, attrs),
1121                    name=master, pdata=thread_pool, trace_file=self.trace_file)
1122            threads.append(t)
1123            t.start()
1124            # Wait until the master finishes (keep pinging the log, though)
1125            mins = 0
1126            while not thread_pool.wait_for_all_done(60.0):
1127                mins += 1
1128                alloc_log.info("Waiting for master (it has been %d mins)" \
1129                        % mins)
1130            # update failed to include the master, if it failed
1131            failed = [ t.getName() for t in threads if not t.rv ]
1132
1133        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1134        # If one failed clean up, unless fail_soft is set
1135        if failed:
1136            if not fail_soft:
1137                thread_pool.clear()
1138                for tb in succeeded:
1139                    # Create and start a thread to stop the segment
1140                    thread_pool.wait_for_slot()
1141                    uri = self.tbmap.get(tb, None)
1142                    t  = self.pooled_thread(\
1143                            target=self.terminate_segment(log=log,
1144                                testbed=tb,
1145                                cert_file=self.cert_file, 
1146                                cert_pwd=self.cert_pwd,
1147                                trusted_certs=self.trusted_certs,
1148                                caller=self.call_TerminateSegment),
1149                            args=(uri, tbparams[tb]['federant']['allocID']),
1150                            name=tb,
1151                            pdata=thread_pool, trace_file=self.trace_file)
1152                    t.start()
1153                # Wait until all finish
1154                thread_pool.wait_for_all_done()
1155
1156                # release the allocations
1157                for tb in tbparams.keys():
1158                    self.release_access(tb, tbparams[tb]['allocID'],
1159                            tbparams[tb].get('uri', None))
1160                # Remove the placeholder
1161                self.state_lock.acquire()
1162                self.state[eid]['experimentStatus'] = 'failed'
1163                if self.state_filename: self.write_state()
1164                self.state_lock.release()
1165
1166                log.error("Swap in failed on %s" % ",".join(failed))
1167                return
1168        else:
1169            log.info("[start_segment]: Experiment %s active" % eid)
1170
1171
1172        # Walk up tmpdir, deleting as we go
1173        if self.cleanup:
1174            log.debug("[start_experiment]: removing %s" % tmpdir)
1175            for path, dirs, files in os.walk(tmpdir, topdown=False):
1176                for f in files:
1177                    os.remove(os.path.join(path, f))
1178                for d in dirs:
1179                    os.rmdir(os.path.join(path, d))
1180            os.rmdir(tmpdir)
1181        else:
1182            log.debug("[start_experiment]: not removing %s" % tmpdir)
1183
1184        # Insert the experiment into our state and update the disk copy
1185        self.state_lock.acquire()
1186        self.state[expid]['experimentStatus'] = 'active'
1187        self.state[eid] = self.state[expid]
1188        if self.state_filename: self.write_state()
1189        self.state_lock.release()
1190        return
1191
1192
1193    def add_kit(self, e, kit):
1194        """
1195        Add a Software object created from the list of (install, location)
1196        tuples passed as kit  to the software attribute of an object e.  We
1197        do this enough to break out the code, but it's kind of a hack to
1198        avoid changing the old tuple rep.
1199        """
1200
1201        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1202
1203        if isinstance(e.software, list): e.software.extend(s)
1204        else: e.software = s
1205
1206
1207    def create_experiment_state(self, fid, req, expid, expcert):
1208        """
1209        Create the initial entry in the experiment's state.  The expid and
1210        expcert are the experiment's fedid and certifacte that represents that
1211        ID, which are installed in the experiment state.  If the request
1212        includes a suggested local name that is used if possible.  If the local
1213        name is already taken by an experiment owned by this user that has
1214        failed, it is overwriutten.  Otherwise new letters are added until a
1215        valid localname is found.  The generated local name is returned.
1216        """
1217
1218        if req.has_key('experimentID') and \
1219                req['experimentID'].has_key('localname'):
1220            overwrite = False
1221            eid = req['experimentID']['localname']
1222            # If there's an old failed experiment here with the same local name
1223            # and accessible by this user, we'll overwrite it, otherwise we'll
1224            # fall through and do the collision avoidance.
1225            old_expid = self.get_experiment_fedid(eid)
1226            if old_expid and self.check_experiment_access(fid, old_expid):
1227                self.state_lock.acquire()
1228                status = self.state[eid].get('experimentStatus', None)
1229                if status and status == 'failed':
1230                    # remove the old access attribute
1231                    self.auth.unset_attribute(fid, old_expid)
1232                    overwrite = True
1233                    del self.state[eid]
1234                    del self.state[old_expid]
1235                self.state_lock.release()
1236            self.state_lock.acquire()
1237            while (self.state.has_key(eid) and not overwrite):
1238                eid += random.choice(string.ascii_letters)
1239            # Initial state
1240            self.state[eid] = {
1241                    'experimentID' : \
1242                            [ { 'localname' : eid }, {'fedid': expid } ],
1243                    'experimentStatus': 'starting',
1244                    'experimentAccess': { 'X509' : expcert },
1245                    'owner': fid,
1246                    'log' : [],
1247                }
1248            self.state[expid] = self.state[eid]
1249            if self.state_filename: self.write_state()
1250            self.state_lock.release()
1251        else:
1252            eid = self.exp_stem
1253            for i in range(0,5):
1254                eid += random.choice(string.ascii_letters)
1255            self.state_lock.acquire()
1256            while (self.state.has_key(eid)):
1257                eid = self.exp_stem
1258                for i in range(0,5):
1259                    eid += random.choice(string.ascii_letters)
1260            # Initial state
1261            self.state[eid] = {
1262                    'experimentID' : \
1263                            [ { 'localname' : eid }, {'fedid': expid } ],
1264                    'experimentStatus': 'starting',
1265                    'experimentAccess': { 'X509' : expcert },
1266                    'owner': fid,
1267                    'log' : [],
1268                }
1269            self.state[expid] = self.state[eid]
1270            if self.state_filename: self.write_state()
1271            self.state_lock.release()
1272
1273        return eid
1274
1275
1276    def allocate_ips_to_topo(self, top):
1277        """
1278        Add an ip4_address attribute to all the hosts in the topology, based on
1279        the shared substrates on which they sit.  An /etc/hosts file is also
1280        created and returned as a list of hostfiles entries.  We also return
1281        the allocator, because we may need to allocate IPs to portals
1282        (specifically DRAGON portals).
1283        """
1284        subs = sorted(top.substrates, 
1285                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1286                reverse=True)
1287        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1288        ifs = { }
1289        hosts = [ ]
1290
1291        for idx, s in enumerate(subs):
1292            a = ips.allocate(len(s.interfaces)+2)
1293            if a :
1294                base, num = a
1295                if num < len(s.interfaces) +2 : 
1296                    raise service_error(service_error.internal,
1297                            "Allocator returned wrong number of IPs??")
1298            else:
1299                raise service_error(service_error.req, 
1300                        "Cannot allocate IP addresses")
1301
1302            base += 1
1303            for i in s.interfaces:
1304                i.attribute.append(
1305                        topdl.Attribute('ip4_address', 
1306                            "%s" % ip_addr(base)))
1307                hname = i.element.name[0]
1308                if ifs.has_key(hname):
1309                    hosts.append("%s\t%s-%s %s-%d" % \
1310                            (ip_addr(base), hname, s.name, hname,
1311                                ifs[hname]))
1312                else:
1313                    ifs[hname] = 0
1314                    hosts.append("%s\t%s-%s %s-%d %s" % \
1315                            (ip_addr(base), hname, s.name, hname,
1316                                ifs[hname], hname))
1317
1318                ifs[hname] += 1
1319                base += 1
1320        return hosts, ips
1321
1322    def get_access_to_testbeds(self, testbeds, user, access_user, 
1323            export_project, master, allocated, tbparams):
1324        """
1325        Request access to the various testbeds required for this instantiation
1326        (passed in as testbeds).  User, access_user, expoert_project and master
1327        are used to construct the correct requests.  Per-testbed parameters are
1328        returned in tbparams.
1329        """
1330        for tb in testbeds:
1331            self.get_access(tb, None, user, tbparams, master,
1332                    export_project, access_user)
1333            allocated[tb] = 1
1334
1335    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1336        """
1337        Create the sub-topologies that are needed for experimetn instantiation.
1338        Along the way attach startup commands to the computers in the
1339        subtopologies.
1340        """
1341        for tb in testbeds:
1342            topo[tb] = top.clone()
1343            to_delete = [ ]
1344            for e in topo[tb].elements:
1345                etb = e.get_attribute('testbed')
1346                if etb and etb != tb:
1347                    for i in e.interface:
1348                        for s in i.subs:
1349                            try:
1350                                s.interfaces.remove(i)
1351                            except ValueError:
1352                                raise service_error(service_error.internal,
1353                                        "Can't remove interface??")
1354                    to_delete.append(e)
1355            for e in to_delete:
1356                topo[tb].elements.remove(e)
1357            topo[tb].make_indices()
1358
1359            for e in [ e for e in topo[tb].elements \
1360                    if isinstance(e,topdl.Computer)]:
1361                if tb == master:
1362                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1363                else:
1364                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1365                scmd = e.get_attribute('startup')
1366                if scmd:
1367                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1368
1369                e.set_attribute('startup', cmd)
1370                if self.fedkit: self.add_kit(e, self.fedkit)
1371
1372    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1373            portal_type, iface_desc=()):
1374        sproject = tbparams[st].get('project', 'project')
1375        dproject = tbparams[dt].get('project', 'project')
1376        mproject = tbparams[master].get('project', 'project')
1377        sdomain = tbparams[st].get('domain', ".example.com")
1378        ddomain = tbparams[dt].get('domain', ".example.com")
1379        mdomain = tbparams[master].get('domain', '.example.com')
1380        muser = tbparams[master].get('user', 'root')
1381        smbshare = tbparams[master].get('smbshare', 'USERS')
1382        aid = tbparams[dt]['allocID']['fedid']
1383        if st == master or dt == master:
1384            active = ("%s" % (st == master))
1385        else:
1386            active = ("%s" %(st > dt))
1387
1388        ifaces = [ ]
1389        for sub, attrs in iface_desc:
1390            inf = topdl.Interface(
1391                    substrate=sub,
1392                    attribute=[
1393                        topdl.Attribute(
1394                            attribute=n,
1395                            value = v)
1396                        for n, v in attrs
1397                        ]
1398                    )
1399            ifaces.append(inf)
1400        return topdl.Computer(
1401                name=myname,
1402                attribute=[ 
1403                    topdl.Attribute(attribute=n,value=v)
1404                        for n, v in (\
1405                            ('portal', 'true'),
1406                            ('domain', sdomain),
1407                            ('masterdomain', mdomain),
1408                            ('masterexperiment', "%s/%s" % \
1409                                    (mproject, eid)),
1410                            ('masteruser', muser),
1411                            ('smbshare', smbshare),
1412                            ('experiment', "%s/%s" % \
1413                                    (sproject, eid)),
1414                            ('peer', "%s" % desthost),
1415                            ('peer_segment', "%s" % aid),
1416                            ('scriptdir', 
1417                                "/usr/local/federation/bin"),
1418                            ('active', "%s" % active),
1419                            ('portal_type', portal_type), 
1420                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl >& /tmp/bridge.log'))
1421                    ],
1422                interface=ifaces,
1423                )
1424
1425    def new_portal_substrate(self, st, dt, eid, tbparams):
1426        ddomain = tbparams[dt].get('domain', ".example.com")
1427        dproject = tbparams[dt].get('project', 'project')
1428        tsubstrate = \
1429                topdl.Substrate(name='%s-%s' % (st, dt),
1430                        attribute= [
1431                            topdl.Attribute(
1432                                attribute='portal',
1433                                value='true')
1434                            ]
1435                        )
1436        segment_element = topdl.Segment(
1437                id= tbparams[dt]['allocID'],
1438                type='emulab',
1439                uri = self.tbmap.get(dt, None),
1440                interface=[ 
1441                    topdl.Interface(
1442                        substrate=tsubstrate.name),
1443                    ],
1444                attribute = [
1445                    topdl.Attribute(attribute=n, value=v)
1446                        for n, v in (\
1447                            ('domain', ddomain),
1448                            ('experiment', "%s/%s" % \
1449                                    (dproject, eid)),)
1450                    ],
1451                )
1452
1453        return (tsubstrate, segment_element)
1454
1455    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
1456        if sub.capacity is None:
1457            raise service_error(service_error.internal,
1458                    "Cannot DRAGON split substrate w/o capacity")
1459        segs = [ ]
1460        substr = topdl.Substrate(name="dragon%d" % idx, 
1461                capacity=sub.capacity.clone(),
1462                attribute=[ topdl.Attribute(attribute=n, value=v)
1463                    for n, v, in (\
1464                            ('vlan', 'unassigned%d' % idx),)])
1465        for tb in tbs.keys():
1466            seg = topdl.Segment(
1467                    id = tbparams[tb]['allocID'],
1468                    type='emulab',
1469                    uri = self.tbmap.get(tb, None),
1470                    interface=[ 
1471                        topdl.Interface(
1472                            substrate=substr.name),
1473                        ],
1474                    attribute=[ topdl.Attribute(
1475                        attribute='dragon_endpoint', value='true'),
1476                        ]
1477                    )
1478            if tbparams[tb].has_key('vlans'):
1479                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1480            segs.append(seg)
1481
1482        topo["dragon%d" %idx] = \
1483                topdl.Topology(substrates=[substr], elements=segs,
1484                        attribute=[
1485                            topdl.Attribute(attribute="transit", value='true'),
1486                            topdl.Attribute(attribute="dynamic", value='true'),
1487                            topdl.Attribute(attribute="testbed", value='dragon'),
1488                            ]
1489                        )
1490
1491    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid):
1492        """
1493        Add attribiutes to the various elements indicating that they are to be
1494        dragon connected and create a dragon segment in tops to be
1495        instantiated.
1496        """
1497
1498        def get_substrate_from_topo(name, t):
1499            for s in t.substrates:
1500                if s.name == name: return s
1501            else: return None
1502
1503        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1504        elements = [ i.element for i in sub.interfaces ]
1505        count = { }
1506        for e in elements:
1507            tb = e.get_attribute('testbed')
1508            count[tb] = count.get(tb, 0) + 1
1509
1510        for tb in tbs.keys():
1511            s = get_substrate_from_topo(sub.name, topo[tb])
1512            if s:
1513                for i in s.interfaces:
1514                    i.set_attribute('dragon_vlan', 'unassigned%d' % dn)
1515                    if count[tb] > 1: i.set_attribute('dragon_type', 'lan')
1516                    else: i.set_attribute('dragon_type', 'link')
1517            else:
1518                raise service_error(service_error.internal,
1519                        "No substrate %s in testbed %s" % (sub.name, tb))
1520
1521        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1522
1523    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1524            segment_substrate, portals):
1525        # More than one testbed is on this substrate.  Insert
1526        # some portals into the subtopologies.  st == source testbed,
1527        # dt == destination testbed.
1528        for st in tbs.keys():
1529            if not segment_substrate.has_key(st):
1530                segment_substrate[st] = { }
1531            if not portals.has_key(st): 
1532                portals[st] = { }
1533            for dt in [ t for t in tbs.keys() if t != st]:
1534                sproject = tbparams[st].get('project', 'project')
1535                dproject = tbparams[dt].get('project', 'project')
1536                mproject = tbparams[master].get('project', 'project')
1537                sdomain = tbparams[st].get('domain', ".example.com")
1538                ddomain = tbparams[dt].get('domain', ".example.com")
1539                mdomain = tbparams[master].get('domain', '.example.com')
1540                muser = tbparams[master].get('user', 'root')
1541                smbshare = tbparams[master].get('smbshare', 'USERS')
1542                aid = tbparams[dt]['allocID']['fedid']
1543                if st == master or dt == master:
1544                    active = ("%s" % (st == master))
1545                else:
1546                    active = ("%s" %(st > dt))
1547                if not segment_substrate[st].has_key(dt):
1548                    # Put a substrate and a segment for the connected
1549                    # testbed in there.
1550                    tsubstrate, segment_element = \
1551                            self.new_portal_substrate(st, dt, eid, tbparams)
1552                    segment_substrate[st][dt] = tsubstrate
1553                    topo[st].substrates.append(tsubstrate)
1554                    topo[st].elements.append(segment_element)
1555
1556                new_portal = False
1557                if portals[st].has_key(dt):
1558                    # There's a portal set up to go to this destination.
1559                    # See if there's room to multiples this connection on
1560                    # it.  If so, add an interface to the portal; if not,
1561                    # set up to add a portal below.
1562                    # [This little festival of braces is just a pop of the
1563                    # last element in the list of portals between st and
1564                    # dt.]
1565                    portal = portals[st][dt][-1]
1566                    mux = len([ i for i in portal.interface \
1567                            if not i.get_attribute('portal')])
1568                    if mux == self.muxmax:
1569                        new_portal = True
1570                        portal_type = "experiment"
1571                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1572                        desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1573                    else:
1574                        new_i = topdl.Interface(
1575                                substrate=s.name,
1576                                attribute=[ 
1577                                    topdl.Attribute(
1578                                        attribute='ip4_address', 
1579                                        value=tbs[dt]
1580                                    )
1581                                ])
1582                        portal.interface.append(new_i)
1583                else:
1584                    # First connection to this testbed, make an empty list
1585                    # and set up to add the new portal below
1586                    new_portal = True
1587                    portals[st][dt] = [ ]
1588                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1589                    desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1590
1591                    if dt == master or st == master: portal_type = "both"
1592                    else: portal_type = "experiment"
1593
1594                if new_portal:
1595                    infs = (
1596                            (segment_substrate[st][dt].name, 
1597                                (('portal', 'true'),)),
1598                            (sub.name, 
1599                                (('ip4_address', tbs[dt]),))
1600                        )
1601                    portal =  self.new_portal_node(st, dt, tbparams, 
1602                            master, eid, myname, desthost, portal_type,
1603                            infs)
1604                    if self.fedkit:
1605                        self.add_kit(portal, self.fedkit)
1606                    if self.gatewaykit: 
1607                        self.add_kit(portal, self.gatewaykit)
1608
1609                    topo[st].elements.append(portal)
1610                    portals[st][dt].append(portal)
1611
1612    def add_control_portal(self, st, dt, master, eid, topo, tbparams):
1613        # Add to the master testbed
1614        tsubstrate, segment_element = \
1615                self.new_portal_substrate(st, dt, eid, tbparams)
1616        myname = "%stunnel" % dt
1617        desthost = "%stunnel" % st
1618
1619        portal = self.new_portal_node(st, dt, tbparams, master,
1620                eid, myname, desthost, "control", 
1621                ((tsubstrate.name,(('portal','true'),)),))
1622        if self.fedkit:
1623            self.add_kit(portal, self.fedkit)
1624        if self.gatewaykit: 
1625            self.add_kit(portal, self.gatewaykit)
1626
1627        topo[st].substrates.append(tsubstrate)
1628        topo[st].elements.append(segment_element)
1629        topo[st].elements.append(portal)
1630
1631    def new_dragon_portal(self, st, dt, master, eid, dip, idx, 
1632            substrate, tbparams):
1633        # Add to the master testbed
1634        myname = "%stunnel" % dt
1635        desthost = "%s" % ip_addr(dip)
1636
1637        portal = self.new_portal_node(st, dt, tbparams, master,
1638                eid, myname, desthost, "control", 
1639                ((substrate.name,(
1640                    ('portal','true'),
1641                    ('ip4_address', "%s" % ip_addr(dip)),
1642                    ('dragon_vlan', 'unassigned%d' % idx),
1643                    ('dragon_type', 'link'),)),))
1644        if self.fedkit:
1645            self.add_kit(portal, self.fedkit)
1646        if self.gatewaykit: 
1647            self.add_kit(portal, self.gatewaykit)
1648
1649        return portal
1650
1651    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator):
1652        """
1653        For each substrate in the main topology, find those that
1654        have nodes on more than one testbed.  Insert portal nodes
1655        into the copies of those substrates on the sub topologies.
1656        """
1657        segment_substrate = { }
1658        portals = { }
1659        for s in top.substrates:
1660            # tbs will contain an ip address on this subsrate that is in
1661            # each testbed.
1662            tbs = { }
1663            for i in s.interfaces:
1664                e = i.element
1665                tb = e.get_attribute('testbed')
1666                if tb and not tbs.has_key(tb):
1667                    for i in e.interface:
1668                        if s in i.subs:
1669                            tbs[tb]= i.get_attribute('ip4_address')
1670            if len(tbs) < 2:
1671                continue
1672
1673            # DRAGON will not create multi-site vlans yet
1674            if len(tbs) == 2 and \
1675                    all([tbparams[x].has_key('dragon') for x in tbs]):
1676                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1677                        master, eid)
1678            else:
1679                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1680                        eid, segment_substrate, portals)
1681
1682        # Make sure that all the slaves have a control portal back to the
1683        # master.
1684        for tb in [ t for t in tbparams.keys() if t != master ]:
1685            if len([e for e in topo[tb].elements \
1686                    if isinstance(e, topdl.Computer) and \
1687                    e.get_attribute('portal') and \
1688                    e.get_attribute('portal_type') == 'both']) == 0:
1689
1690                if tbparams[master].has_key('dragon') \
1691                        and tbparams[tb].has_key('dragon'):
1692
1693                    idx = len([x for x in topo.keys() \
1694                            if x.startswith('dragon')])
1695                    dip, leng = ip_allocator.allocate(4)
1696                    dip += 1
1697                    csub = topdl.Substrate(
1698                            name="dragon-control-%s" % tb,
1699                            capacity=topdl.Capacity(100000.0, 'max'),
1700                            attribute=[
1701                                topdl.Attribute(
1702                                    attribute='portal',
1703                                    value='true'
1704                                    )
1705                                ]
1706                            )
1707                    seg = topdl.Segment(
1708                            id= tbparams[master]['allocID'],
1709                            type='emulab',
1710                            uri = self.tbmap.get(master, None),
1711                            interface=[ 
1712                                topdl.Interface(
1713                                    substrate=csub.name),
1714                                ],
1715                            attribute = [
1716                                topdl.Attribute(attribute=n, value=v)
1717                                    for n, v in (\
1718                                        ('domain', 
1719                                            tbparams[master].get('domain',
1720                                                ".example.com")),
1721                                        ('experiment', "%s/%s" % \
1722                                                (tbparams[master].get(
1723                                                    'project', 
1724                                                    'project'), 
1725                                                    eid)),)
1726                                ],
1727                            )
1728                    topo[tb].substrates.append(csub)
1729                    topo[tb].elements.append(
1730                            self.new_dragon_portal(tb, master, master, eid, 
1731                                dip, idx, csub, tbparams))
1732                    topo[tb].elements.append(seg)
1733
1734                    dip+=1
1735                    mcsub = csub.clone()
1736                    seg = topdl.Segment(
1737                            id= tbparams[tb]['allocID'],
1738                            type='emulab',
1739                            uri = self.tbmap.get(tb, None),
1740                            interface=[ 
1741                                topdl.Interface(
1742                                    substrate=csub.name),
1743                                ],
1744                            attribute = [
1745                                topdl.Attribute(attribute=n, value=v)
1746                                    for n, v in (\
1747                                        ('domain', 
1748                                            tbparams[tb].get('domain',
1749                                                ".example.com")),
1750                                        ('experiment', "%s/%s" % \
1751                                                (tbparams[tb].get('project', 
1752                                                    'project'), 
1753                                                    eid)),)
1754                                ],
1755                            )
1756                    topo[master].substrates.append(mcsub)
1757                    topo[master].elements.append(
1758                            self.new_dragon_portal(master, tb, master, eid, 
1759                                dip, idx, mcsub, tbparams))
1760                    topo[master].elements.append(seg)
1761
1762                    self.create_dragon_substrate(csub, topo, 
1763                            {tb: 1, master:1}, tbparams, master, eid)
1764                else:
1765                    self.add_control_portal(master, tb, master, eid, topo, 
1766                            tbparams)
1767                    self.add_control_portal(tb, master, master, eid, topo, 
1768                            tbparams)
1769
1770        # Connect the portal nodes into the topologies and clear out
1771        # substrates that are not in the topologies
1772        for tb in tbparams.keys():
1773            topo[tb].incorporate_elements()
1774            topo[tb].substrates = \
1775                    [s for s in topo[tb].substrates \
1776                        if len(s.interfaces) >0]
1777
1778    def wrangle_software(self, expid, top, topo, tbparams):
1779        """
1780        Copy software out to the repository directory, allocate permissions and
1781        rewrite the segment topologies to look for the software in local
1782        places.
1783        """
1784
1785        # Copy the rpms and tarfiles to a distribution directory from
1786        # which the federants can retrieve them
1787        linkpath = "%s/software" %  expid
1788        softdir ="%s/%s" % ( self.repodir, linkpath)
1789        softmap = { }
1790        # These are in a list of tuples format (each kit).  This comprehension
1791        # unwraps them into a single list of tuples that initilaizes the set of
1792        # tuples.
1793        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1794                for p, t in l ])
1795        pkgs.update([x.location for e in top.elements \
1796                for x in e.software])
1797        try:
1798            os.makedirs(softdir)
1799        except IOError, e:
1800            raise service_error(
1801                    "Cannot create software directory: %s" % e)
1802        # The actual copying.  Everything's converted into a url for copying.
1803        for pkg in pkgs:
1804            loc = pkg
1805
1806            scheme, host, path = urlparse(loc)[0:3]
1807            dest = os.path.basename(path)
1808            if not scheme:
1809                if not loc.startswith('/'):
1810                    loc = "/%s" % loc
1811                loc = "file://%s" %loc
1812            try:
1813                u = urlopen(loc)
1814            except Exception, e:
1815                raise service_error(service_error.req, 
1816                        "Cannot open %s: %s" % (loc, e))
1817            try:
1818                f = open("%s/%s" % (softdir, dest) , "w")
1819                self.log.debug("Writing %s/%s" % (softdir,dest) )
1820                data = u.read(4096)
1821                while data:
1822                    f.write(data)
1823                    data = u.read(4096)
1824                f.close()
1825                u.close()
1826            except Exception, e:
1827                raise service_error(service_error.internal,
1828                        "Could not copy %s: %s" % (loc, e))
1829            path = re.sub("/tmp", "", linkpath)
1830            # XXX
1831            softmap[pkg] = \
1832                    "https://users.isi.deterlab.net:23232/%s/%s" %\
1833                    ( path, dest)
1834
1835            # Allow the individual segments to access the software.
1836            for tb in tbparams.keys():
1837                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1838                        "/%s/%s" % ( path, dest))
1839
1840        # Convert the software locations in the segments into the local
1841        # copies on this host
1842        for soft in [ s for tb in topo.values() \
1843                for e in tb.elements \
1844                    if getattr(e, 'software', False) \
1845                        for s in e.software ]:
1846            if softmap.has_key(soft.location):
1847                soft.location = softmap[soft.location]
1848
1849
1850    def create_experiment(self, req, fid):
1851        """
1852        The external interface to experiment creation called from the
1853        dispatcher.
1854
1855        Creates a working directory, splits the incoming description using the
1856        splitter script and parses out the avrious subsections using the
1857        lcasses above.  Once each sub-experiment is created, use pooled threads
1858        to instantiate them and start it all up.
1859        """
1860        if not self.auth.check_attribute(fid, 'create'):
1861            raise service_error(service_error.access, "Create access denied")
1862
1863        try:
1864            tmpdir = tempfile.mkdtemp(prefix="split-")
1865            os.mkdir(tmpdir+"/keys")
1866        except IOError:
1867            raise service_error(service_error.internal, "Cannot create tmp dir")
1868
1869        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1870        gw_secretkey_base = "fed.%s" % self.ssh_type
1871        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1872        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1873        tclfile = tmpdir + "/experiment.tcl"
1874        tbparams = { }
1875        try:
1876            access_user = self.accessdb[fid]
1877        except KeyError:
1878            raise service_error(service_error.internal,
1879                    "Access map and authorizer out of sync in " + \
1880                            "create_experiment for fedid %s"  % fid)
1881
1882        pid = "dummy"
1883        gid = "dummy"
1884
1885        req = req.get('CreateRequestBody', None)
1886        if not req:
1887            raise service_error(service_error.req,
1888                    "Bad request format (no CreateRequestBody)")
1889        # The tcl parser needs to read a file so put the content into that file
1890        descr=req.get('experimentdescription', None)
1891        if descr:
1892            file_content=descr.get('ns2description', None)
1893            if file_content:
1894                try:
1895                    f = open(tclfile, 'w')
1896                    f.write(file_content)
1897                    f.close()
1898                except IOError:
1899                    raise service_error(service_error.internal,
1900                            "Cannot write temp experiment description")
1901            else:
1902                raise service_error(service_error.req, 
1903                        "Only ns2descriptions supported")
1904        else:
1905            raise service_error(service_error.req, "No experiment description")
1906
1907        # Generate an ID for the experiment (slice) and a certificate that the
1908        # allocator can use to prove they own it.  We'll ship it back through
1909        # the encrypted connection.
1910        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1911
1912        eid = self.create_experiment_state(fid, req, expid, expcert)
1913        try: 
1914            # This catches exceptions to clear the placeholder if necessary
1915            try:
1916                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1917            except ValueError:
1918                raise service_error(service_error.server_config, 
1919                        "Bad key type (%s)" % self.ssh_type)
1920
1921            user = req.get('user', None)
1922            if user == None:
1923                raise service_error(service_error.req, "No user")
1924
1925            master = req.get('master', None)
1926            if not master:
1927                raise service_error(service_error.req,
1928                        "No master testbed label")
1929            export_project = req.get('exportProject', None)
1930            if not export_project:
1931                raise service_error(service_error.req, "No export project")
1932           
1933            # Translate to topdl
1934            if self.splitter_url:
1935                # XXX: need remote topdl translator
1936                self.log.debug("Calling remote splitter at %s" % \
1937                        self.splitter_url)
1938                split_data = self.remote_splitter(self.splitter_url,
1939                        file_content, master)
1940            else:
1941                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1942                    str(self.muxmax), '-m', master]
1943
1944                if self.fedkit:
1945                    tclcmd.append('-k')
1946
1947                if self.gatewaykit:
1948                    tclcmd.append('-K')
1949
1950                tclcmd.extend([pid, gid, eid, tclfile])
1951
1952                self.log.debug("running local splitter %s", " ".join(tclcmd))
1953                # This is just fantastic.  As a side effect the parser copies
1954                # tb_compat.tcl into the current directory, so that directory
1955                # must be writable by the fedd user.  Doing this in the
1956                # temporary subdir ensures this is the case.
1957                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1958                        cwd=tmpdir)
1959                split_data = tclparser.stdout
1960
1961            top = topdl.topology_from_xml(file=split_data, top="experiment")
1962
1963            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1964             # Find the testbeds to look up
1965            testbeds = set([ a.value for e in top.elements \
1966                    for a in e.attribute \
1967                    if a.attribute == 'testbed'] )
1968
1969            allocated = { }         # Testbeds we can access
1970            topo ={ }               # Sub topologies
1971            self.get_access_to_testbeds(testbeds, user, access_user, 
1972                    export_project, master, allocated, tbparams)
1973            self.split_topology(top, topo, testbeds, eid, master, tbparams)
1974
1975            # Copy configuration files into the remote file store
1976            # The config urlpath
1977            configpath = "/%s/config" % expid
1978            # The config file system location
1979            configdir ="%s%s" % ( self.repodir, configpath)
1980            try:
1981                os.makedirs(configdir)
1982            except IOError, e:
1983                raise service_error(
1984                        "Cannot create config directory: %s" % e)
1985            try:
1986                f = open("%s/hosts" % configdir, "w")
1987                f.write('\n'.join(hosts))
1988                f.close()
1989            except IOError, e:
1990                raise service_error(service_error.internal, 
1991                        "Cannot write hosts file: %s" % e)
1992            try:
1993                copy_file("%s" % gw_pubkey, "%s/%s" % \
1994                        (configdir, gw_pubkey_base))
1995                copy_file("%s" % gw_secretkey, "%s/%s" % \
1996                        (configdir, gw_secretkey_base))
1997            except IOError, e:
1998                raise service_error(service_error.internal, 
1999                        "Cannot copy keyfiles: %s" % e)
2000
2001            # Allow the individual testbeds to access the configuration files.
2002            for tb in tbparams.keys():
2003                asignee = tbparams[tb]['allocID']['fedid']
2004                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2005                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2006
2007            self.add_portals(top, topo, eid, master, tbparams, ip_allocator)
2008            # Now get access to the dynamic testbeds
2009            for k, t in topo.items():
2010                if not t.get_attribute('dynamic'):
2011                    continue
2012                tb = t.get_attribute('testbed')
2013                if tb: 
2014                    self.get_access(tb, None, user, tbparams, master, 
2015                            export_project, access_user)
2016                    tbparams[k] = tbparams[tb]
2017                    del tbparams[tb]
2018                    allocated[k] = 1
2019                else:
2020                    raise service_error(service_error.internal, 
2021                            "Dynamic allocation from no testbed!?")
2022
2023            self.wrangle_software(expid, top, topo, tbparams)
2024
2025            vtopo = topdl.topology_to_vtopo(top)
2026            vis = self.genviz(vtopo)
2027
2028            # save federant information
2029            for k in allocated.keys():
2030                tbparams[k]['federant'] = {\
2031                        'name': [ { 'localname' : eid} ],\
2032                        'allocID' : tbparams[k]['allocID'],\
2033                        'master' : k == master,\
2034                    }
2035                if tbparams[k].has_key('emulab'):
2036                        tbparams[k]['federant']['emulab'] = \
2037                                tbparams[k]['emulab']
2038
2039            self.state_lock.acquire()
2040            self.state[eid]['vtopo'] = vtopo
2041            self.state[eid]['vis'] = vis
2042            self.state[expid]['federant'] = \
2043                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2044                        if tbparams[tb].has_key('federant') ]
2045            if self.state_filename: 
2046                self.write_state()
2047            self.state_lock.release()
2048        except service_error, e:
2049            # If something goes wrong in the parse (usually an access error)
2050            # clear the placeholder state.  From here on out the code delays
2051            # exceptions.  Failing at this point returns a fault to the remote
2052            # caller.
2053
2054            self.state_lock.acquire()
2055            del self.state[eid]
2056            del self.state[expid]
2057            if self.state_filename: self.write_state()
2058            self.state_lock.release()
2059            raise e
2060
2061
2062        # Start the background swapper and return the starting state.  From
2063        # here on out, the state will stick around a while.
2064
2065        # Let users touch the state
2066        self.auth.set_attribute(fid, expid)
2067        self.auth.set_attribute(expid, expid)
2068        # Override fedids can manipulate state as well
2069        for o in self.overrides:
2070            self.auth.set_attribute(o, expid)
2071
2072        # Create a logger that logs to the experiment's state object as well as
2073        # to the main log file.
2074        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2075        alloc_collector = self.list_log(self.state[eid]['log'])
2076        h = logging.StreamHandler(alloc_collector)
2077        # XXX: there should be a global one of these rather than repeating the
2078        # code.
2079        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2080                    '%d %b %y %H:%M:%S'))
2081        alloc_log.addHandler(h)
2082       
2083        # XXX
2084        url_base = 'https://users.isi.deterlab.net:23232'
2085        attrs = [ 
2086                {
2087                    'attribute': 'ssh_pubkey', 
2088                    'value': '%s/%s/config/%s' % \
2089                            (url_base, expid, gw_pubkey_base)
2090                },
2091                {
2092                    'attribute': 'ssh_secretkey', 
2093                    'value': '%s/%s/config/%s' % \
2094                            (url_base, expid, gw_secretkey_base)
2095                },
2096                {
2097                    'attribute': 'hosts', 
2098                    'value': '%s/%s/config/hosts' % \
2099                            (url_base, expid)
2100                },
2101                {
2102                    'attribute': 'experiment_name',
2103                    'value': eid,
2104                },
2105            ]
2106
2107        # Start a thread to do the resource allocation
2108        t  = Thread(target=self.allocate_resources,
2109                args=(allocated, master, eid, expid, expcert, tbparams, 
2110                    topo, tmpdir, alloc_log, alloc_collector, attrs),
2111                name=eid)
2112        t.start()
2113
2114        rv = {
2115                'experimentID': [
2116                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2117                ],
2118                'experimentStatus': 'starting',
2119                'experimentAccess': { 'X509' : expcert }
2120            }
2121
2122        return rv
2123   
2124    def get_experiment_fedid(self, key):
2125        """
2126        find the fedid associated with the localname key in the state database.
2127        """
2128
2129        rv = None
2130        self.state_lock.acquire()
2131        if self.state.has_key(key):
2132            if isinstance(self.state[key], dict):
2133                try:
2134                    kl = [ f['fedid'] for f in \
2135                            self.state[key]['experimentID']\
2136                                if f.has_key('fedid') ]
2137                except KeyError:
2138                    self.state_lock.release()
2139                    raise service_error(service_error.internal, 
2140                            "No fedid for experiment %s when getting "+\
2141                                    "fedid(!?)" % key)
2142                if len(kl) == 1:
2143                    rv = kl[0]
2144                else:
2145                    self.state_lock.release()
2146                    raise service_error(service_error.internal, 
2147                            "multiple fedids for experiment %s when " +\
2148                                    "getting fedid(!?)" % key)
2149            else:
2150                self.state_lock.release()
2151                raise service_error(service_error.internal, 
2152                        "Unexpected state for %s" % key)
2153        self.state_lock.release()
2154        return rv
2155
2156    def check_experiment_access(self, fid, key):
2157        """
2158        Confirm that the fid has access to the experiment.  Though a request
2159        may be made in terms of a local name, the access attribute is always
2160        the experiment's fedid.
2161        """
2162        if not isinstance(key, fedid):
2163            key = self.get_experiment_fedid(key)
2164
2165        if self.auth.check_attribute(fid, key):
2166            return True
2167        else:
2168            raise service_error(service_error.access, "Access Denied")
2169
2170
2171    def get_handler(self, path, fid):
2172        if self.auth.check_attribute(fid, path):
2173            return ("%s/%s" % (self.repodir, path), "application/binary")
2174        else:
2175            return (None, None)
2176
2177    def get_vtopo(self, req, fid):
2178        """
2179        Return the stored virtual topology for this experiment
2180        """
2181        rv = None
2182        state = None
2183
2184        req = req.get('VtopoRequestBody', None)
2185        if not req:
2186            raise service_error(service_error.req,
2187                    "Bad request format (no VtopoRequestBody)")
2188        exp = req.get('experiment', None)
2189        if exp:
2190            if exp.has_key('fedid'):
2191                key = exp['fedid']
2192                keytype = "fedid"
2193            elif exp.has_key('localname'):
2194                key = exp['localname']
2195                keytype = "localname"
2196            else:
2197                raise service_error(service_error.req, "Unknown lookup type")
2198        else:
2199            raise service_error(service_error.req, "No request?")
2200
2201        self.check_experiment_access(fid, key)
2202
2203        self.state_lock.acquire()
2204        if self.state.has_key(key):
2205            if self.state[key].has_key('vtopo'):
2206                rv = { 'experiment' : {keytype: key },\
2207                        'vtopo': self.state[key]['vtopo'],\
2208                    }
2209            else:
2210                state = self.state[key]['experimentStatus']
2211        self.state_lock.release()
2212
2213        if rv: return rv
2214        else: 
2215            if state:
2216                raise service_error(service_error.partial, 
2217                        "Not ready: %s" % state)
2218            else:
2219                raise service_error(service_error.req, "No such experiment")
2220
2221    def get_vis(self, req, fid):
2222        """
2223        Return the stored visualization for this experiment
2224        """
2225        rv = None
2226        state = None
2227
2228        req = req.get('VisRequestBody', None)
2229        if not req:
2230            raise service_error(service_error.req,
2231                    "Bad request format (no VisRequestBody)")
2232        exp = req.get('experiment', None)
2233        if exp:
2234            if exp.has_key('fedid'):
2235                key = exp['fedid']
2236                keytype = "fedid"
2237            elif exp.has_key('localname'):
2238                key = exp['localname']
2239                keytype = "localname"
2240            else:
2241                raise service_error(service_error.req, "Unknown lookup type")
2242        else:
2243            raise service_error(service_error.req, "No request?")
2244
2245        self.check_experiment_access(fid, key)
2246
2247        self.state_lock.acquire()
2248        if self.state.has_key(key):
2249            if self.state[key].has_key('vis'):
2250                rv =  { 'experiment' : {keytype: key },\
2251                        'vis': self.state[key]['vis'],\
2252                        }
2253            else:
2254                state = self.state[key]['experimentStatus']
2255        self.state_lock.release()
2256
2257        if rv: return rv
2258        else:
2259            if state:
2260                raise service_error(service_error.partial, 
2261                        "Not ready: %s" % state)
2262            else:
2263                raise service_error(service_error.req, "No such experiment")
2264
2265    def clean_info_response(self, rv):
2266        """
2267        Remove the information in the experiment's state object that is not in
2268        the info response.
2269        """
2270        # Remove the owner info (should always be there, but...)
2271        if rv.has_key('owner'): del rv['owner']
2272
2273        # Convert the log into the allocationLog parameter and remove the
2274        # log entry (with defensive programming)
2275        if rv.has_key('log'):
2276            rv['allocationLog'] = "".join(rv['log'])
2277            del rv['log']
2278        else:
2279            rv['allocationLog'] = ""
2280
2281        if rv['experimentStatus'] != 'active':
2282            if rv.has_key('federant'): del rv['federant']
2283        else:
2284            # remove the allocationID and uri info from each federant
2285            for f in rv.get('federant', []):
2286                if f.has_key('allocID'): del f['allocID']
2287                if f.has_key('uri'): del f['uri']
2288        return rv
2289
2290    def get_info(self, req, fid):
2291        """
2292        Return all the stored info about this experiment
2293        """
2294        rv = None
2295
2296        req = req.get('InfoRequestBody', None)
2297        if not req:
2298            raise service_error(service_error.req,
2299                    "Bad request format (no InfoRequestBody)")
2300        exp = req.get('experiment', None)
2301        if exp:
2302            if exp.has_key('fedid'):
2303                key = exp['fedid']
2304                keytype = "fedid"
2305            elif exp.has_key('localname'):
2306                key = exp['localname']
2307                keytype = "localname"
2308            else:
2309                raise service_error(service_error.req, "Unknown lookup type")
2310        else:
2311            raise service_error(service_error.req, "No request?")
2312
2313        self.check_experiment_access(fid, key)
2314
2315        # The state may be massaged by the service function that called
2316        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2317        # state.
2318        self.state_lock.acquire()
2319        if self.state.has_key(key):
2320            rv = copy.deepcopy(self.state[key])
2321        self.state_lock.release()
2322
2323        if rv:
2324            return self.clean_info_response(rv)
2325        else:
2326            raise service_error(service_error.req, "No such experiment")
2327
2328    def get_multi_info(self, req, fid):
2329        """
2330        Return all the stored info that this fedid can access
2331        """
2332        rv = { 'info': [ ] }
2333
2334        self.state_lock.acquire()
2335        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2336            self.check_experiment_access(fid, key)
2337
2338            if self.state.has_key(key):
2339                e = copy.deepcopy(self.state[key])
2340                e = self.clean_info_response(e)
2341                rv['info'].append(e)
2342        self.state_lock.release()
2343        return rv
2344
2345    def terminate_experiment(self, req, fid):
2346        """
2347        Swap this experiment out on the federants and delete the shared
2348        information
2349        """
2350        tbparams = { }
2351        req = req.get('TerminateRequestBody', None)
2352        if not req:
2353            raise service_error(service_error.req,
2354                    "Bad request format (no TerminateRequestBody)")
2355        force = req.get('force', False)
2356        exp = req.get('experiment', None)
2357        if exp:
2358            if exp.has_key('fedid'):
2359                key = exp['fedid']
2360                keytype = "fedid"
2361            elif exp.has_key('localname'):
2362                key = exp['localname']
2363                keytype = "localname"
2364            else:
2365                raise service_error(service_error.req, "Unknown lookup type")
2366        else:
2367            raise service_error(service_error.req, "No request?")
2368
2369        self.check_experiment_access(fid, key)
2370
2371        dealloc_list = [ ]
2372
2373
2374        # Create a logger that logs to the dealloc_list as well as to the main
2375        # log file.
2376        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2377        h = logging.StreamHandler(self.list_log(dealloc_list))
2378        # XXX: there should be a global one of these rather than repeating the
2379        # code.
2380        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2381                    '%d %b %y %H:%M:%S'))
2382        dealloc_log.addHandler(h)
2383
2384        self.state_lock.acquire()
2385        fed_exp = self.state.get(key, None)
2386
2387        if fed_exp:
2388            # This branch of the conditional holds the lock to generate a
2389            # consistent temporary tbparams variable to deallocate experiments.
2390            # It releases the lock to do the deallocations and reacquires it to
2391            # remove the experiment state when the termination is complete.
2392
2393            # First make sure that the experiment creation is complete.
2394            status = fed_exp.get('experimentStatus', None)
2395
2396            if status:
2397                if status in ('starting', 'terminating'):
2398                    if not force:
2399                        self.state_lock.release()
2400                        raise service_error(service_error.partial, 
2401                                'Experiment still being created or destroyed')
2402                    else:
2403                        self.log.warning('Experiment in %s state ' % status + \
2404                                'being terminated by force.')
2405            else:
2406                # No status??? trouble
2407                self.state_lock.release()
2408                raise service_error(service_error.internal,
2409                        "Experiment has no status!?")
2410
2411            ids = []
2412            #  experimentID is a list of dicts that are self-describing
2413            #  identifiers.  This finds all the fedids and localnames - the
2414            #  keys of self.state - and puts them into ids.
2415            for id in fed_exp.get('experimentID', []):
2416                if id.has_key('fedid'): ids.append(id['fedid'])
2417                if id.has_key('localname'): ids.append(id['localname'])
2418
2419            # Collect the allocation/segment ids
2420            for fed in fed_exp.get('federant', []):
2421                try:
2422                    tb = fed['uri']
2423                    aid = fed['allocID']
2424                except KeyError, e:
2425                    continue
2426                tbparams[tb] = aid
2427            fed_exp['experimentStatus'] = 'terminating'
2428            if self.state_filename: self.write_state()
2429            self.state_lock.release()
2430
2431            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2432            # then completes, so we can't wait if nothing starts.  So, no
2433            # tbparams, no start.
2434            if len(tbparams) > 0:
2435                thread_pool = self.thread_pool(self.nthreads)
2436                for tb in tbparams.keys():
2437                    # Create and start a thread to stop the segment
2438                    thread_pool.wait_for_slot()
2439                    uri = self.tbmap.get(tb, None)
2440                    t  = self.pooled_thread(\
2441                            target=self.terminate_segment(log=dealloc_log,
2442                                testbed=tb,
2443                                cert_file=self.cert_file, 
2444                                cert_pwd=self.cert_pwd,
2445                                trusted_certs=self.trusted_certs,
2446                                caller=self.call_TerminateSegment),
2447                            args=(uri, tbparams[tb]), name=tb,
2448                            pdata=thread_pool, trace_file=self.trace_file)
2449                    t.start()
2450                # Wait for completions
2451                thread_pool.wait_for_all_done()
2452
2453            # release the allocations (failed experiments have done this
2454            # already, and starting experiments may be in odd states, so we
2455            # ignore errors releasing those allocations
2456            try: 
2457                for tb in tbparams.keys():
2458                    self.release_access(tb, tbparams[tb])
2459            except service_error, e:
2460                if status != 'failed' and not force:
2461                    raise e
2462
2463            # Remove the terminated experiment
2464            self.state_lock.acquire()
2465            for id in ids:
2466                if self.state.has_key(id): del self.state[id]
2467
2468            if self.state_filename: self.write_state()
2469            self.state_lock.release()
2470
2471            return { 
2472                    'experiment': exp , 
2473                    'deallocationLog': "".join(dealloc_list),
2474                    }
2475        else:
2476            # Don't forget to release the lock
2477            self.state_lock.release()
2478            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.