source: fedd/federation/experiment_control.py @ 99eb8cf

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 99eb8cf was 99eb8cf, checked in by Ted Faber <faber@…>, 14 years ago

More credential changes and removal of user/project stuff

  • Property mode set to 100644
File size: 88.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221        self.repo_url = config.get("experiment_control", "repo_url", 
222                "https://users.isi.deterlab.net:23235");
223
224        self.exp_stem = "fed-stem"
225        self.log = logging.getLogger("fedd.experiment_control")
226        set_log_level(config, "experiment_control", self.log)
227        self.muxmax = 2
228        self.nthreads = 2
229        self.randomize_experiments = False
230
231        self.splitter = None
232        self.ssh_keygen = "/usr/bin/ssh-keygen"
233        self.ssh_identity_file = None
234
235
236        self.debug = config.getboolean("experiment_control", "create_debug")
237        self.cleanup = not config.getboolean("experiment_control", 
238                "leave_tmpfiles")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'New': soap_handler('New', self.new_experiment),
337                'Create': soap_handler('Create', self.create_experiment),
338                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
339                'Vis': soap_handler('Vis', self.get_vis),
340                'Info': soap_handler('Info', self.get_info),
341                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
342                'Terminate': soap_handler('Terminate', 
343                    self.terminate_experiment),
344        }
345
346        self.xmlrpc_services = {\
347                'New': xmlrpc_handler('New', self.new_experiment),
348                'Create': xmlrpc_handler('Create', self.create_experiment),
349                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
350                'Vis': xmlrpc_handler('Vis', self.get_vis),
351                'Info': xmlrpc_handler('Info', self.get_info),
352                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
353                'Terminate': xmlrpc_handler('Terminate',
354                    self.terminate_experiment),
355        }
356
357    # Call while holding self.state_lock
358    def write_state(self):
359        """
360        Write a new copy of experiment state after copying the existing state
361        to a backup.
362
363        State format is a simple pickling of the state dictionary.
364        """
365        if os.access(self.state_filename, os.W_OK):
366            copy_file(self.state_filename, \
367                    "%s.bak" % self.state_filename)
368        try:
369            f = open(self.state_filename, 'w')
370            pickle.dump(self.state, f)
371        except IOError, e:
372            self.log.error("Can't write file %s: %s" % \
373                    (self.state_filename, e))
374        except pickle.PicklingError, e:
375            self.log.error("Pickling problem: %s" % e)
376        except TypeError, e:
377            self.log.error("Pickling problem (TypeError): %s" % e)
378
379    # Call while holding self.state_lock
380    def read_state(self):
381        """
382        Read a new copy of experiment state.  Old state is overwritten.
383
384        State format is a simple pickling of the state dictionary.
385        """
386       
387        def get_experiment_id(state):
388            """
389            Pull the fedid experimentID out of the saved state.  This is kind
390            of a gross walk through the dict.
391            """
392
393            if state.has_key('experimentID'):
394                for e in state['experimentID']:
395                    if e.has_key('fedid'):
396                        return e['fedid']
397                else:
398                    return None
399            else:
400                return None
401
402        def get_alloc_ids(state):
403            """
404            Pull the fedids of the identifiers of each allocation from the
405            state.  Again, a dict dive that's best isolated.
406            """
407
408            return [ f['allocID']['fedid'] 
409                    for f in state.get('federant',[]) \
410                        if f.has_key('allocID') and \
411                            f['allocID'].has_key('fedid')]
412
413
414        try:
415            f = open(self.state_filename, "r")
416            self.state = pickle.load(f)
417            self.log.debug("[read_state]: Read state from %s" % \
418                    self.state_filename)
419        except IOError, e:
420            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
421                    % (self.state_filename, e))
422        except pickle.UnpicklingError, e:
423            self.log.warning(("[read_state]: No saved state: " + \
424                    "Unpickling failed: %s") % e)
425       
426        for s in self.state.values():
427            try:
428
429                eid = get_experiment_id(s)
430                if eid : 
431                    # Give the owner rights to the experiment
432                    self.auth.set_attribute(s['owner'], eid)
433                    # And holders of the eid as well
434                    self.auth.set_attribute(eid, eid)
435                    # allow overrides to control experiments as well
436                    for o in self.overrides:
437                        self.auth.set_attribute(o, eid)
438                    # Set permissions to allow reading of the software repo, if
439                    # any, as well.
440                    for a in get_alloc_ids(s):
441                        self.auth.set_attribute(a, 'repo/%s' % eid)
442                else:
443                    raise KeyError("No experiment id")
444            except KeyError, e:
445                self.log.warning("[read_state]: State ownership or identity " +\
446                        "misformatted in %s: %s" % (self.state_filename, e))
447
448
449    def read_accessdb(self, accessdb_file):
450        """
451        Read the mapping from fedids that can create experiments to their name
452        in the 3-level access namespace.  All will be asserted from this
453        testbed and can include the local username and porject that will be
454        asserted on their behalf by this fedd.  Each fedid is also added to the
455        authorization system with the "create" attribute.
456        """
457        self.accessdb = {}
458        # These are the regexps for parsing the db
459        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
460        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
461                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
462        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
463                "\s*->\s*(" + name_expr + ")\s*$")
464        lineno = 0
465
466        # Parse the mappings and store in self.authdb, a dict of
467        # fedid -> (proj, user)
468        try:
469            f = open(accessdb_file, "r")
470            for line in f:
471                lineno += 1
472                line = line.strip()
473                if len(line) == 0 or line.startswith('#'):
474                    continue
475                m = project_line.match(line)
476                if m:
477                    fid = fedid(hexstr=m.group(1))
478                    project, user = m.group(2,3)
479                    if not self.accessdb.has_key(fid):
480                        self.accessdb[fid] = []
481                    self.accessdb[fid].append((project, user))
482                    continue
483
484                m = user_line.match(line)
485                if m:
486                    fid = fedid(hexstr=m.group(1))
487                    project = None
488                    user = m.group(2)
489                    if not self.accessdb.has_key(fid):
490                        self.accessdb[fid] = []
491                    self.accessdb[fid].append((project, user))
492                    continue
493                self.log.warn("[experiment_control] Error parsing access " +\
494                        "db %s at line %d" %  (accessdb_file, lineno))
495        except IOError:
496            raise service_error(service_error.internal,
497                    "Error opening/reading %s as experiment " +\
498                            "control accessdb" %  accessdb_file)
499        f.close()
500
501        # Initialize the authorization attributes
502        for fid in self.accessdb.keys():
503            self.auth.set_attribute(fid, 'create')
504            self.auth.set_attribute(fid, 'new')
505
506    def read_mapdb(self, file):
507        """
508        Read a simple colon separated list of mappings for the
509        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
510        """
511
512        self.tbmap = { }
513        lineno =0
514        try:
515            f = open(file, "r")
516            for line in f:
517                lineno += 1
518                line = line.strip()
519                if line.startswith('#') or len(line) == 0:
520                    continue
521                try:
522                    label, url = line.split(':', 1)
523                    self.tbmap[label] = url
524                except ValueError, e:
525                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
526                            "map db: %s %s" % (lineno, line, e))
527        except IOError, e:
528            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
529                    "open %s: %s" % (file, e))
530        f.close()
531       
532    def generate_ssh_keys(self, dest, type="rsa" ):
533        """
534        Generate a set of keys for the gateways to use to talk.
535
536        Keys are of type type and are stored in the required dest file.
537        """
538        valid_types = ("rsa", "dsa")
539        t = type.lower();
540        if t not in valid_types: raise ValueError
541        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
542
543        try:
544            trace = open("/dev/null", "w")
545        except IOError:
546            raise service_error(service_error.internal,
547                    "Cannot open /dev/null??");
548
549        # May raise CalledProcessError
550        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
551        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
552        if rv != 0:
553            raise service_error(service_error.internal, 
554                    "Cannot generate nonce ssh keys.  %s return code %d" \
555                            % (self.ssh_keygen, rv))
556
557    def gentopo(self, str):
558        """
559        Generate the topology dtat structure from the splitter's XML
560        representation of it.
561
562        The topology XML looks like:
563            <experiment>
564                <nodes>
565                    <node><vname></vname><ips>ip1:ip2</ips></node>
566                </nodes>
567                <lans>
568                    <lan>
569                        <vname></vname><vnode></vnode><ip></ip>
570                        <bandwidth></bandwidth><member>node:port</member>
571                    </lan>
572                </lans>
573        """
574        class topo_parse:
575            """
576            Parse the topology XML and create the dats structure.
577            """
578            def __init__(self):
579                # Typing of the subelements for data conversion
580                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
581                self.int_subelements = ( 'bandwidth',)
582                self.float_subelements = ( 'delay',)
583                # The final data structure
584                self.nodes = [ ]
585                self.lans =  [ ]
586                self.topo = { \
587                        'node': self.nodes,\
588                        'lan' : self.lans,\
589                    }
590                self.element = { }  # Current element being created
591                self.chars = ""     # Last text seen
592
593            def end_element(self, name):
594                # After each sub element the contents is added to the current
595                # element or to the appropriate list.
596                if name == 'node':
597                    self.nodes.append(self.element)
598                    self.element = { }
599                elif name == 'lan':
600                    self.lans.append(self.element)
601                    self.element = { }
602                elif name in self.str_subelements:
603                    self.element[name] = self.chars
604                    self.chars = ""
605                elif name in self.int_subelements:
606                    self.element[name] = int(self.chars)
607                    self.chars = ""
608                elif name in self.float_subelements:
609                    self.element[name] = float(self.chars)
610                    self.chars = ""
611
612            def found_chars(self, data):
613                self.chars += data.rstrip()
614
615
616        tp = topo_parse();
617        parser = xml.parsers.expat.ParserCreate()
618        parser.EndElementHandler = tp.end_element
619        parser.CharacterDataHandler = tp.found_chars
620
621        parser.Parse(str)
622
623        return tp.topo
624       
625
626    def genviz(self, topo):
627        """
628        Generate the visualization the virtual topology
629        """
630
631        neato = "/usr/local/bin/neato"
632        # These are used to parse neato output and to create the visualization
633        # file.
634        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
635        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
636                "%s</type></node>"
637
638        try:
639            # Node names
640            nodes = [ n['vname'] for n in topo['node'] ]
641            topo_lans = topo['lan']
642        except KeyError, e:
643            raise service_error(service_error.internal, "Bad topology: %s" %e)
644
645        lans = { }
646        links = { }
647
648        # Walk through the virtual topology, organizing the connections into
649        # 2-node connections (links) and more-than-2-node connections (lans).
650        # When a lan is created, it's added to the list of nodes (there's a
651        # node in the visualization for the lan).
652        for l in topo_lans:
653            if links.has_key(l['vname']):
654                if len(links[l['vname']]) < 2:
655                    links[l['vname']].append(l['vnode'])
656                else:
657                    nodes.append(l['vname'])
658                    lans[l['vname']] = links[l['vname']]
659                    del links[l['vname']]
660                    lans[l['vname']].append(l['vnode'])
661            elif lans.has_key(l['vname']):
662                lans[l['vname']].append(l['vnode'])
663            else:
664                links[l['vname']] = [ l['vnode'] ]
665
666
667        # Open up a temporary file for dot to turn into a visualization
668        try:
669            df, dotname = tempfile.mkstemp()
670            dotfile = os.fdopen(df, 'w')
671        except IOError:
672            raise service_error(service_error.internal,
673                    "Failed to open file in genviz")
674
675        try:
676            dnull = open('/dev/null', 'w')
677        except IOError:
678            service_error(service_error.internal,
679                    "Failed to open /dev/null in genviz")
680
681        # Generate a dot/neato input file from the links, nodes and lans
682        try:
683            print >>dotfile, "graph G {"
684            for n in nodes:
685                print >>dotfile, '\t"%s"' % n
686            for l in links.keys():
687                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
688            for l in lans.keys():
689                for n in lans[l]:
690                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
691            print >>dotfile, "}"
692            dotfile.close()
693        except TypeError:
694            raise service_error(service_error.internal,
695                    "Single endpoint link in vtopo")
696        except IOError:
697            raise service_error(service_error.internal, "Cannot write dot file")
698
699        # Use dot to create a visualization
700        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
701                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
702                close_fds=True)
703        dnull.close()
704
705        # Translate dot to vis format
706        vis_nodes = [ ]
707        vis = { 'node': vis_nodes }
708        for line in dot.stdout:
709            m = vis_re.match(line)
710            if m:
711                vn = m.group(1)
712                vis_node = {'name': vn, \
713                        'x': float(m.group(2)),\
714                        'y' : float(m.group(3)),\
715                    }
716                if vn in links.keys() or vn in lans.keys():
717                    vis_node['type'] = 'lan'
718                else:
719                    vis_node['type'] = 'node'
720                vis_nodes.append(vis_node)
721        rv = dot.wait()
722
723        os.remove(dotname)
724        if rv == 0 : return vis
725        else: return None
726
727    def get_access(self, tb, nodes, tbparam, master, export_project,
728            access_user):
729        """
730        Get access to testbed through fedd and set the parameters for that tb
731        """
732        uri = self.tbmap.get(tb, None)
733        if not uri:
734            raise service_error(serice_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        # Tweak search order so that if there are entries in access_user that
738        # have a project matching the export project, we try them first
739        if export_project and export_project.has_key('localname'): 
740            pn = export_project['localname'] 
741               
742            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
743            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
744        else: 
745            access_sequence = access_user
746
747        for p, u in access_sequence: 
748            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
749                    "to %s") %  ((p or "None"), u, uri))
750
751            if p:
752                # Request with user and project specified
753                req = {\
754                        'destinationTestbed' : { 'uri' : uri },
755                        'credential': [ "project: %s" % p, "user: %s"  % u],
756                        'allocID' : { 'localname': 'test' },
757                    }
758            else:
759                # Request with only user specified
760                req = {\
761                        'destinationTestbed' : { 'uri' : uri },
762                        'credential': [ 'user: %s' % u ],
763                        'user':  [ {'userID': { 'localname': u } } ],
764                        'allocID' : { 'localname': 'test' },
765                    }
766
767            if tb == master:
768                # NB, the export_project parameter is a dict that includes
769                # the type
770                req['exportProject'] = export_project
771
772            # node resources if any
773            if nodes != None and len(nodes) > 0:
774                rnodes = [ ]
775                for n in nodes:
776                    rn = { }
777                    image, hw, count = n.split(":")
778                    if image: rn['image'] = [ image ]
779                    if hw: rn['hardware'] = [ hw ]
780                    if count and int(count) >0 : rn['count'] = int(count)
781                    rnodes.append(rn)
782                req['resources']= { }
783                req['resources']['node'] = rnodes
784
785            try:
786                if self.local_access.has_key(uri):
787                    # Local access call
788                    req = { 'RequestAccessRequestBody' : req }
789                    r = self.local_access[uri].RequestAccess(req, 
790                            fedid(file=self.cert_file))
791                    r = { 'RequestAccessResponseBody' : r }
792                else:
793                    r = self.call_RequestAccess(uri, req, 
794                            self.cert_file, self.cert_pwd, self.trusted_certs)
795            except service_error, e:
796                if e.code == service_error.access:
797                    self.log.debug("[get_access] Access denied")
798                    r = None
799                    continue
800                else:
801                    raise e
802
803            if r.has_key('RequestAccessResponseBody'):
804                # Through to here we have a valid response, not a fault.
805                # Access denied is a fault, so something better or worse than
806                # access denied has happened.
807                r = r['RequestAccessResponseBody']
808                self.log.debug("[get_access] Access granted")
809                break
810            else:
811                raise service_error(service_error.protocol,
812                        "Bad proxy response")
813       
814        if not r:
815            raise service_error(service_error.access, 
816                    "Access denied by %s (%s)" % (tb, uri))
817
818        if r.has_key('emulab'):
819            e = r['emulab']
820            p = e['project']
821            tbparam[tb] = { 
822                    "boss": e['boss'],
823                    "host": e['ops'],
824                    "domain": e['domain'],
825                    "fs": e['fileServer'],
826                    "eventserver": e['eventServer'],
827                    "project": unpack_id(p['name']),
828                    "emulab" : e,
829                    "allocID" : r['allocID'],
830                    "uri": uri,
831                    }
832            # Make the testbed name be the label the user applied
833            p['testbed'] = {'localname': tb }
834
835            for u in p['user']:
836                role = u.get('role', None)
837                if role == 'experimentCreation':
838                    tbparam[tb]['user'] = unpack_id(u['userID'])
839                    break
840            else:
841                raise service_error(service_error.internal, 
842                        "No createExperimentUser from %s" %tb)
843            # Add attributes to parameter space.  We don't allow attributes to
844            # overlay any parameters already installed.
845            for a in e['fedAttr']:
846                try:
847                    if a['attribute'] and \
848                            isinstance(a['attribute'], basestring)\
849                            and not tbparam[tb].has_key(a['attribute'].lower()):
850                        tbparam[tb][a['attribute'].lower()] = a['value']
851                except KeyError:
852                    self.log.error("Bad attribute in response: %s" % a)
853        else:
854            tbparam[tb] = { 
855                "allocID" : r['allocID'],
856                "uri": uri,
857            }
858
859    def release_access(self, tb, aid, uri=None):
860        """
861        Release access to testbed through fedd
862        """
863
864        if not uri:
865            uri = self.tbmap.get(tb, None)
866        if not uri:
867            raise service_error(service_error.server_config, 
868                    "Unknown testbed: %s" % tb)
869
870        if self.local_access.has_key(uri):
871            resp = self.local_access[uri].ReleaseAccess(\
872                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
873                    fedid(file=self.cert_file))
874            resp = { 'ReleaseAccessResponseBody': resp } 
875        else:
876            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
877                    self.cert_file, self.cert_pwd, self.trusted_certs)
878
879        # better error coding
880
881    def remote_splitter(self, uri, desc, master):
882
883        req = {
884                'description' : { 'ns2description': desc },
885                'master': master,
886                'include_fedkit': bool(self.fedkit),
887                'include_gatewaykit': bool(self.gatewaykit)
888            }
889
890        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
891                self.trusted_certs)
892
893        if r.has_key('Ns2SplitResponseBody'):
894            r = r['Ns2SplitResponseBody']
895            if r.has_key('output'):
896                return r['output'].splitlines()
897            else:
898                raise service_error(service_error.protocol, 
899                        "Bad splitter response (no output)")
900        else:
901            raise service_error(service_error.protocol, "Bad splitter response")
902
903    class start_segment:
904        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
905                cert_pwd=None, trusted_certs=None, caller=None,
906                log_collector=None):
907            self.log = log
908            self.debug = debug
909            self.cert_file = cert_file
910            self.cert_pwd = cert_pwd
911            self.trusted_certs = None
912            self.caller = caller
913            self.testbed = testbed
914            self.log_collector = log_collector
915            self.response = None
916
917        def __call__(self, uri, aid, topo, master, attrs=None):
918            req = {
919                    'allocID': { 'fedid' : aid }, 
920                    'segmentdescription': { 
921                        'topdldescription': topo.to_dict(),
922                    },
923                    'master': master,
924                }
925            if attrs:
926                req['fedAttr'] = attrs
927
928            try:
929                self.log.debug("Calling StartSegment at %s " % uri)
930                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
931                        self.trusted_certs)
932                if r.has_key('StartSegmentResponseBody'):
933                    lval = r['StartSegmentResponseBody'].get('allocationLog',
934                            None)
935                    if lval and self.log_collector:
936                        for line in  lval.splitlines(True):
937                            self.log_collector.write(line)
938                    self.response = r
939                else:
940                    raise service_error(service_error.internal, 
941                            "Bad response!?: %s" %r)
942                return True
943            except service_error, e:
944                self.log.error("Start segment failed on %s: %s" % \
945                        (self.testbed, e))
946                return False
947
948
949
950    class terminate_segment:
951        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
952                cert_pwd=None, trusted_certs=None, caller=None):
953            self.log = log
954            self.debug = debug
955            self.cert_file = cert_file
956            self.cert_pwd = cert_pwd
957            self.trusted_certs = None
958            self.caller = caller
959            self.testbed = testbed
960
961        def __call__(self, uri, aid ):
962            req = {
963                    'allocID': aid , 
964                }
965            try:
966                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
967                        self.trusted_certs)
968                return True
969            except service_error, e:
970                self.log.error("Terminate segment failed on %s: %s" % \
971                        (self.testbed, e))
972                return False
973   
974
975    def allocate_resources(self, allocated, master, eid, expid, 
976            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
977            attrs=None):
978        def get_vlan(r):
979            if r.has_key('StartSegmentResponseBody'):
980                srb = r['StartSegmentResponseBody']
981                if srb.has_key('fedAttr'):
982                    for k, v in [ (a['attribute'], a['value']) \
983                            for a in srb['fedAttr']]:
984                        if k == 'vlan': return v
985            return None
986
987        started = { }           # Testbeds where a sub-experiment started
988                                # successfully
989
990        # XXX
991        fail_soft = False
992
993        slaves = [ k for k in allocated.keys() \
994                if k != master and not topo[k].get_attribute('transit')]
995        transit = [ k for k in allocated.keys() \
996                if topo[k].get_attribute('transit')]
997
998        log = alloc_log or self.log
999
1000        thread_pool = self.thread_pool(self.nthreads)
1001        threads = [ ]
1002
1003        for tb in transit:
1004            uri = tbparams[tb]['uri']
1005            if tbparams[tb].has_key('allocID') and \
1006                    tbparams[tb]['allocID'].has_key('fedid'):
1007                aid = tbparams[tb]['allocID']['fedid']
1008            else:
1009                raise service_error(service_error.internal, 
1010                        "No alloc id for testbed %s !?" % tb)
1011
1012            m = re.search('(\d+)', tb)
1013            if m:
1014                to_repl = "unassigned%s" % m.group(1)
1015            else:
1016                raise service_error(service_error.internal, 
1017                        "Bad dynamic allocation name")
1018                break
1019
1020            ss = self.start_segment(log=log, debug=self.debug, 
1021                testbed=tb, cert_file=self.cert_file, 
1022                cert_pwd=self.cert_pwd, 
1023                trusted_certs=self.trusted_certs,
1024                caller=self.call_StartSegment,
1025                log_collector=log_collector)
1026            t = self.pooled_thread(
1027                    target=ss,
1028                    args =(uri, aid, topo[tb], False, attrs),
1029                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1030            threads.append(t)
1031            t.start()
1032            # Wait until the this transit node finishes (keep pinging the log,
1033            # though)
1034
1035            mins = 0
1036            while not thread_pool.wait_for_all_done(60.0):
1037                mins += 1
1038                alloc_log.info("Waiting for transit (it has been %d mins)" \
1039                        % mins)
1040
1041            if t.rv:
1042                vlan = get_vlan(ss.response)
1043                if vlan is not None:
1044                    for k, t in topo.items():
1045                        for e in t.elements:
1046                            for i in e.interface:
1047                                vl = i.get_attribute('dragon_vlan')
1048                                if vl is not None and vl == to_repl:
1049                                    i.set_attribute('dragon_vlan', vlan)
1050            else:
1051                break
1052            thread_pool.clear()
1053
1054
1055        failed = [ t.getName() for t in threads if not t.rv ]
1056
1057        if len(failed) == 0:
1058            for tb in slaves:
1059                # Create and start a thread to start the segment, and save it
1060                # to get the return value later
1061                thread_pool.wait_for_slot()
1062                uri = self.tbmap.get(tb, None)
1063                if not uri:
1064                    raise service_error(service_error.internal, 
1065                            "Unknown testbed %s !?" % tb)
1066
1067                if tbparams[tb].has_key('allocID') and \
1068                        tbparams[tb]['allocID'].has_key('fedid'):
1069                    aid = tbparams[tb]['allocID']['fedid']
1070                else:
1071                    raise service_error(service_error.internal, 
1072                            "No alloc id for testbed %s !?" % tb)
1073
1074                t  = self.pooled_thread(\
1075                        target=self.start_segment(log=log, debug=self.debug,
1076                            testbed=tb, cert_file=self.cert_file,
1077                            cert_pwd=self.cert_pwd,
1078                            trusted_certs=self.trusted_certs,
1079                            caller=self.call_StartSegment,
1080                            log_collector=log_collector), 
1081                        args=(uri, aid, topo[tb], False, attrs), name=tb,
1082                        pdata=thread_pool, trace_file=self.trace_file)
1083                threads.append(t)
1084                t.start()
1085
1086            # Wait until all finish (keep pinging the log, though)
1087            mins = 0
1088            while not thread_pool.wait_for_all_done(60.0):
1089                mins += 1
1090                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1091                        % mins)
1092
1093            thread_pool.clear()
1094
1095        # If none failed, start the master
1096        failed = [ t.getName() for t in threads if not t.rv ]
1097
1098        if len(failed) == 0:
1099            uri = self.tbmap.get(master, None)
1100            if not uri:
1101                raise service_error(service_error.internal, 
1102                        "Unknown testbed %s !?" % master)
1103
1104            if tbparams[master].has_key('allocID') and \
1105                    tbparams[master]['allocID'].has_key('fedid'):
1106                aid = tbparams[master]['allocID']['fedid']
1107            else:
1108                raise service_error(service_error.internal, 
1109                    "No alloc id for testbed %s !?" % master)
1110            t = self.pooled_thread(
1111                    target=self.start_segment(log=log, debug=self.debug, 
1112                        testbed=master, cert_file=self.cert_file, 
1113                        cert_pwd=self.cert_pwd, 
1114                        trusted_certs=self.trusted_certs,
1115                        caller=self.call_StartSegment,
1116                        log_collector=log_collector),
1117                    args =(uri, aid, topo[master], True, attrs),
1118                    name=master, pdata=thread_pool, trace_file=self.trace_file)
1119            threads.append(t)
1120            t.start()
1121            # Wait until the master finishes (keep pinging the log, though)
1122            mins = 0
1123            while not thread_pool.wait_for_all_done(60.0):
1124                mins += 1
1125                alloc_log.info("Waiting for master (it has been %d mins)" \
1126                        % mins)
1127            # update failed to include the master, if it failed
1128            failed = [ t.getName() for t in threads if not t.rv ]
1129
1130        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1131        # If one failed clean up, unless fail_soft is set
1132        if failed:
1133            if not fail_soft:
1134                thread_pool.clear()
1135                for tb in succeeded:
1136                    # Create and start a thread to stop the segment
1137                    thread_pool.wait_for_slot()
1138                    uri = tbparams[tb]['uri']
1139                    t  = self.pooled_thread(\
1140                            target=self.terminate_segment(log=log,
1141                                testbed=tb,
1142                                cert_file=self.cert_file, 
1143                                cert_pwd=self.cert_pwd,
1144                                trusted_certs=self.trusted_certs,
1145                                caller=self.call_TerminateSegment),
1146                            args=(uri, tbparams[tb]['federant']['allocID']),
1147                            name=tb,
1148                            pdata=thread_pool, trace_file=self.trace_file)
1149                    t.start()
1150                # Wait until all finish
1151                thread_pool.wait_for_all_done()
1152
1153                # release the allocations
1154                for tb in tbparams.keys():
1155                    self.release_access(tb, tbparams[tb]['allocID'],
1156                            tbparams[tb].get('uri', None))
1157                # Remove the placeholder
1158                self.state_lock.acquire()
1159                self.state[eid]['experimentStatus'] = 'failed'
1160                if self.state_filename: self.write_state()
1161                self.state_lock.release()
1162
1163                log.error("Swap in failed on %s" % ",".join(failed))
1164                return
1165        else:
1166            log.info("[start_segment]: Experiment %s active" % eid)
1167
1168
1169        # Walk up tmpdir, deleting as we go
1170        if self.cleanup:
1171            log.debug("[start_experiment]: removing %s" % tmpdir)
1172            for path, dirs, files in os.walk(tmpdir, topdown=False):
1173                for f in files:
1174                    os.remove(os.path.join(path, f))
1175                for d in dirs:
1176                    os.rmdir(os.path.join(path, d))
1177            os.rmdir(tmpdir)
1178        else:
1179            log.debug("[start_experiment]: not removing %s" % tmpdir)
1180
1181        # Insert the experiment into our state and update the disk copy
1182        self.state_lock.acquire()
1183        self.state[expid]['experimentStatus'] = 'active'
1184        self.state[eid] = self.state[expid]
1185        if self.state_filename: self.write_state()
1186        self.state_lock.release()
1187        return
1188
1189
1190    def add_kit(self, e, kit):
1191        """
1192        Add a Software object created from the list of (install, location)
1193        tuples passed as kit  to the software attribute of an object e.  We
1194        do this enough to break out the code, but it's kind of a hack to
1195        avoid changing the old tuple rep.
1196        """
1197
1198        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1199
1200        if isinstance(e.software, list): e.software.extend(s)
1201        else: e.software = s
1202
1203
1204    def create_experiment_state(self, fid, req, expid, expcert, 
1205            state='starting'):
1206        """
1207        Create the initial entry in the experiment's state.  The expid and
1208        expcert are the experiment's fedid and certifacte that represents that
1209        ID, which are installed in the experiment state.  If the request
1210        includes a suggested local name that is used if possible.  If the local
1211        name is already taken by an experiment owned by this user that has
1212        failed, it is overwritten.  Otherwise new letters are added until a
1213        valid localname is found.  The generated local name is returned.
1214        """
1215
1216        if req.has_key('experimentID') and \
1217                req['experimentID'].has_key('localname'):
1218            overwrite = False
1219            eid = req['experimentID']['localname']
1220            # If there's an old failed experiment here with the same local name
1221            # and accessible by this user, we'll overwrite it, otherwise we'll
1222            # fall through and do the collision avoidance.
1223            old_expid = self.get_experiment_fedid(eid)
1224            if old_expid and self.check_experiment_access(fid, old_expid):
1225                self.state_lock.acquire()
1226                status = self.state[eid].get('experimentStatus', None)
1227                if status and status == 'failed':
1228                    # remove the old access attribute
1229                    self.auth.unset_attribute(fid, old_expid)
1230                    overwrite = True
1231                    del self.state[eid]
1232                    del self.state[old_expid]
1233                self.state_lock.release()
1234            self.state_lock.acquire()
1235            while (self.state.has_key(eid) and not overwrite):
1236                eid += random.choice(string.ascii_letters)
1237            # Initial state
1238            self.state[eid] = {
1239                    'experimentID' : \
1240                            [ { 'localname' : eid }, {'fedid': expid } ],
1241                    'experimentStatus': state,
1242                    'experimentAccess': { 'X509' : expcert },
1243                    'owner': fid,
1244                    'log' : [],
1245                }
1246            self.state[expid] = self.state[eid]
1247            if self.state_filename: self.write_state()
1248            self.state_lock.release()
1249        else:
1250            eid = self.exp_stem
1251            for i in range(0,5):
1252                eid += random.choice(string.ascii_letters)
1253            self.state_lock.acquire()
1254            while (self.state.has_key(eid)):
1255                eid = self.exp_stem
1256                for i in range(0,5):
1257                    eid += random.choice(string.ascii_letters)
1258            # Initial state
1259            self.state[eid] = {
1260                    'experimentID' : \
1261                            [ { 'localname' : eid }, {'fedid': expid } ],
1262                    'experimentStatus': state,
1263                    'experimentAccess': { 'X509' : expcert },
1264                    'owner': fid,
1265                    'log' : [],
1266                }
1267            self.state[expid] = self.state[eid]
1268            if self.state_filename: self.write_state()
1269            self.state_lock.release()
1270
1271        return eid
1272
1273
1274    def allocate_ips_to_topo(self, top):
1275        """
1276        Add an ip4_address attribute to all the hosts in the topology, based on
1277        the shared substrates on which they sit.  An /etc/hosts file is also
1278        created and returned as a list of hostfiles entries.  We also return
1279        the allocator, because we may need to allocate IPs to portals
1280        (specifically DRAGON portals).
1281        """
1282        subs = sorted(top.substrates, 
1283                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1284                reverse=True)
1285        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1286        ifs = { }
1287        hosts = [ ]
1288
1289        for idx, s in enumerate(subs):
1290            a = ips.allocate(len(s.interfaces)+2)
1291            if a :
1292                base, num = a
1293                if num < len(s.interfaces) +2 : 
1294                    raise service_error(service_error.internal,
1295                            "Allocator returned wrong number of IPs??")
1296            else:
1297                raise service_error(service_error.req, 
1298                        "Cannot allocate IP addresses")
1299
1300            base += 1
1301            for i in s.interfaces:
1302                i.attribute.append(
1303                        topdl.Attribute('ip4_address', 
1304                            "%s" % ip_addr(base)))
1305                hname = i.element.name[0]
1306                if ifs.has_key(hname):
1307                    hosts.append("%s\t%s-%s %s-%d" % \
1308                            (ip_addr(base), hname, s.name, hname,
1309                                ifs[hname]))
1310                else:
1311                    ifs[hname] = 0
1312                    hosts.append("%s\t%s-%s %s-%d %s" % \
1313                            (ip_addr(base), hname, s.name, hname,
1314                                ifs[hname], hname))
1315
1316                ifs[hname] += 1
1317                base += 1
1318        return hosts, ips
1319
1320    def get_access_to_testbeds(self, testbeds, access_user, 
1321            export_project, master, allocated, tbparams):
1322        """
1323        Request access to the various testbeds required for this instantiation
1324        (passed in as testbeds).  User, access_user, expoert_project and master
1325        are used to construct the correct requests.  Per-testbed parameters are
1326        returned in tbparams.
1327        """
1328        for tb in testbeds:
1329            self.get_access(tb, None, tbparams, master,
1330                    export_project, access_user)
1331            allocated[tb] = 1
1332
1333    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1334        """
1335        Create the sub-topologies that are needed for experimetn instantiation.
1336        Along the way attach startup commands to the computers in the
1337        subtopologies.
1338        """
1339        for tb in testbeds:
1340            topo[tb] = top.clone()
1341            to_delete = [ ]
1342            for e in topo[tb].elements:
1343                etb = e.get_attribute('testbed')
1344                if etb and etb != tb:
1345                    for i in e.interface:
1346                        for s in i.subs:
1347                            try:
1348                                s.interfaces.remove(i)
1349                            except ValueError:
1350                                raise service_error(service_error.internal,
1351                                        "Can't remove interface??")
1352                    to_delete.append(e)
1353            for e in to_delete:
1354                topo[tb].elements.remove(e)
1355            topo[tb].make_indices()
1356
1357            for e in [ e for e in topo[tb].elements \
1358                    if isinstance(e,topdl.Computer)]:
1359                if tb == master:
1360                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1361                else:
1362                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1363                scmd = e.get_attribute('startup')
1364                if scmd:
1365                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1366
1367                e.set_attribute('startup', cmd)
1368                if self.fedkit: self.add_kit(e, self.fedkit)
1369
1370    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1371            portal_type, iface_desc=()):
1372        sproject = tbparams[st].get('project', 'project')
1373        dproject = tbparams[dt].get('project', 'project')
1374        mproject = tbparams[master].get('project', 'project')
1375        sdomain = tbparams[st].get('domain', ".example.com")
1376        ddomain = tbparams[dt].get('domain', ".example.com")
1377        mdomain = tbparams[master].get('domain', '.example.com')
1378        muser = tbparams[master].get('user', 'root')
1379        smbshare = tbparams[master].get('smbshare', 'USERS')
1380        aid = tbparams[dt]['allocID']['fedid']
1381        if st == master or dt == master:
1382            active = ("%s" % (st == master))
1383        else:
1384            active = ("%s" %(st > dt))
1385
1386        ifaces = [ ]
1387        for sub, attrs in iface_desc:
1388            inf = topdl.Interface(
1389                    substrate=sub,
1390                    attribute=[
1391                        topdl.Attribute(
1392                            attribute=n,
1393                            value = v)
1394                        for n, v in attrs
1395                        ]
1396                    )
1397            ifaces.append(inf)
1398        return topdl.Computer(
1399                name=myname,
1400                attribute=[ 
1401                    topdl.Attribute(attribute=n,value=v)
1402                        for n, v in (\
1403                            ('portal', 'true'),
1404                            ('domain', sdomain),
1405                            ('masterdomain', mdomain),
1406                            ('masterexperiment', "%s/%s" % \
1407                                    (mproject, eid)),
1408                            ('masteruser', muser),
1409                            ('smbshare', smbshare),
1410                            ('experiment', "%s/%s" % \
1411                                    (sproject, eid)),
1412                            ('peer', "%s" % desthost),
1413                            ('peer_segment', "%s" % aid),
1414                            ('scriptdir', 
1415                                "/usr/local/federation/bin"),
1416                            ('active', "%s" % active),
1417                            ('portal_type', portal_type), 
1418                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl >& /tmp/bridge.log'))
1419                    ],
1420                interface=ifaces,
1421                )
1422
1423    def new_portal_substrate(self, st, dt, eid, tbparams):
1424        ddomain = tbparams[dt].get('domain', ".example.com")
1425        dproject = tbparams[dt].get('project', 'project')
1426        tsubstrate = \
1427                topdl.Substrate(name='%s-%s' % (st, dt),
1428                        attribute= [
1429                            topdl.Attribute(
1430                                attribute='portal',
1431                                value='true')
1432                            ]
1433                        )
1434        segment_element = topdl.Segment(
1435                id= tbparams[dt]['allocID'],
1436                type='emulab',
1437                uri = self.tbmap.get(dt, None),
1438                interface=[ 
1439                    topdl.Interface(
1440                        substrate=tsubstrate.name),
1441                    ],
1442                attribute = [
1443                    topdl.Attribute(attribute=n, value=v)
1444                        for n, v in (\
1445                            ('domain', ddomain),
1446                            ('experiment', "%s/%s" % \
1447                                    (dproject, eid)),)
1448                    ],
1449                )
1450
1451        return (tsubstrate, segment_element)
1452
1453    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
1454        if sub.capacity is None:
1455            raise service_error(service_error.internal,
1456                    "Cannot DRAGON split substrate w/o capacity")
1457        segs = [ ]
1458        substr = topdl.Substrate(name="dragon%d" % idx, 
1459                capacity=sub.capacity.clone(),
1460                attribute=[ topdl.Attribute(attribute=n, value=v)
1461                    for n, v, in (\
1462                            ('vlan', 'unassigned%d' % idx),)])
1463        for tb in tbs.keys():
1464            seg = topdl.Segment(
1465                    id = tbparams[tb]['allocID'],
1466                    type='emulab',
1467                    uri = self.tbmap.get(tb, None),
1468                    interface=[ 
1469                        topdl.Interface(
1470                            substrate=substr.name),
1471                        ],
1472                    attribute=[ topdl.Attribute(
1473                        attribute='dragon_endpoint', 
1474                        value=tbparams[tb]['dragon']),
1475                        ]
1476                    )
1477            if tbparams[tb].has_key('vlans'):
1478                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1479            segs.append(seg)
1480
1481        topo["dragon%d" %idx] = \
1482                topdl.Topology(substrates=[substr], elements=segs,
1483                        attribute=[
1484                            topdl.Attribute(attribute="transit", value='true'),
1485                            topdl.Attribute(attribute="dynamic", value='true'),
1486                            topdl.Attribute(attribute="testbed", value='dragon'),
1487                            ]
1488                        )
1489
1490    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid):
1491        """
1492        Add attribiutes to the various elements indicating that they are to be
1493        dragon connected and create a dragon segment in tops to be
1494        instantiated.
1495        """
1496
1497        def get_substrate_from_topo(name, t):
1498            for s in t.substrates:
1499                if s.name == name: return s
1500            else: return None
1501
1502        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1503        elements = [ i.element for i in sub.interfaces ]
1504        count = { }
1505        for e in elements:
1506            tb = e.get_attribute('testbed')
1507            count[tb] = count.get(tb, 0) + 1
1508
1509        for tb in tbs.keys():
1510            s = get_substrate_from_topo(sub.name, topo[tb])
1511            if s:
1512                for i in s.interfaces:
1513                    i.set_attribute('dragon_vlan', 'unassigned%d' % dn)
1514                    if count[tb] > 1: i.set_attribute('dragon_type', 'lan')
1515                    else: i.set_attribute('dragon_type', 'link')
1516            else:
1517                raise service_error(service_error.internal,
1518                        "No substrate %s in testbed %s" % (sub.name, tb))
1519
1520        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1521
1522    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1523            segment_substrate, portals):
1524        # More than one testbed is on this substrate.  Insert
1525        # some portals into the subtopologies.  st == source testbed,
1526        # dt == destination testbed.
1527        for st in tbs.keys():
1528            if not segment_substrate.has_key(st):
1529                segment_substrate[st] = { }
1530            if not portals.has_key(st): 
1531                portals[st] = { }
1532            for dt in [ t for t in tbs.keys() if t != st]:
1533                sproject = tbparams[st].get('project', 'project')
1534                dproject = tbparams[dt].get('project', 'project')
1535                mproject = tbparams[master].get('project', 'project')
1536                sdomain = tbparams[st].get('domain', ".example.com")
1537                ddomain = tbparams[dt].get('domain', ".example.com")
1538                mdomain = tbparams[master].get('domain', '.example.com')
1539                muser = tbparams[master].get('user', 'root')
1540                smbshare = tbparams[master].get('smbshare', 'USERS')
1541                aid = tbparams[dt]['allocID']['fedid']
1542                if st == master or dt == master:
1543                    active = ("%s" % (st == master))
1544                else:
1545                    active = ("%s" %(st > dt))
1546                if not segment_substrate[st].has_key(dt):
1547                    # Put a substrate and a segment for the connected
1548                    # testbed in there.
1549                    tsubstrate, segment_element = \
1550                            self.new_portal_substrate(st, dt, eid, tbparams)
1551                    segment_substrate[st][dt] = tsubstrate
1552                    topo[st].substrates.append(tsubstrate)
1553                    topo[st].elements.append(segment_element)
1554
1555                new_portal = False
1556                if portals[st].has_key(dt):
1557                    # There's a portal set up to go to this destination.
1558                    # See if there's room to multiples this connection on
1559                    # it.  If so, add an interface to the portal; if not,
1560                    # set up to add a portal below.
1561                    # [This little festival of braces is just a pop of the
1562                    # last element in the list of portals between st and
1563                    # dt.]
1564                    portal = portals[st][dt][-1]
1565                    mux = len([ i for i in portal.interface \
1566                            if not i.get_attribute('portal')])
1567                    if mux == self.muxmax:
1568                        new_portal = True
1569                        portal_type = "experiment"
1570                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1571                        desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1572                    else:
1573                        new_i = topdl.Interface(
1574                                substrate=sub.name,
1575                                attribute=[ 
1576                                    topdl.Attribute(
1577                                        attribute='ip4_address', 
1578                                        value=tbs[dt]
1579                                    )
1580                                ])
1581                        portal.interface.append(new_i)
1582                else:
1583                    # First connection to this testbed, make an empty list
1584                    # and set up to add the new portal below
1585                    new_portal = True
1586                    portals[st][dt] = [ ]
1587                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1588                    desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1589
1590                    if dt == master or st == master: portal_type = "both"
1591                    else: portal_type = "experiment"
1592
1593                if new_portal:
1594                    infs = (
1595                            (segment_substrate[st][dt].name, 
1596                                (('portal', 'true'),)),
1597                            (sub.name, 
1598                                (('ip4_address', tbs[dt]),))
1599                        )
1600                    portal =  self.new_portal_node(st, dt, tbparams, 
1601                            master, eid, myname, desthost, portal_type,
1602                            infs)
1603                    if self.fedkit:
1604                        self.add_kit(portal, self.fedkit)
1605                    if self.gatewaykit: 
1606                        self.add_kit(portal, self.gatewaykit)
1607
1608                    topo[st].elements.append(portal)
1609                    portals[st][dt].append(portal)
1610
1611    def add_control_portal(self, st, dt, master, eid, topo, tbparams):
1612        # Add to the master testbed
1613        tsubstrate, segment_element = \
1614                self.new_portal_substrate(st, dt, eid, tbparams)
1615        myname = "%stunnel" % dt
1616        desthost = "%stunnel" % st
1617
1618        portal = self.new_portal_node(st, dt, tbparams, master,
1619                eid, myname, desthost, "control", 
1620                ((tsubstrate.name,(('portal','true'),)),))
1621        if self.fedkit:
1622            self.add_kit(portal, self.fedkit)
1623        if self.gatewaykit: 
1624            self.add_kit(portal, self.gatewaykit)
1625
1626        topo[st].substrates.append(tsubstrate)
1627        topo[st].elements.append(segment_element)
1628        topo[st].elements.append(portal)
1629
1630    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1631            substrate, tbparams):
1632        # Add to the master testbed
1633        myname = "%stunnel" % dt
1634        desthost = "%s" % ip_addr(dip)
1635
1636        portal = self.new_portal_node(st, dt, tbparams, master,
1637                eid, myname, desthost, "control", 
1638                ((substrate.name,(
1639                    ('portal','true'),
1640                    ('ip4_address', "%s" % ip_addr(myip)),
1641                    ('dragon_vlan', 'unassigned%d' % idx),
1642                    ('dragon_type', 'link'),)),))
1643        if self.fedkit:
1644            self.add_kit(portal, self.fedkit)
1645        if self.gatewaykit: 
1646            self.add_kit(portal, self.gatewaykit)
1647
1648        return portal
1649
1650    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator):
1651        """
1652        For each substrate in the main topology, find those that
1653        have nodes on more than one testbed.  Insert portal nodes
1654        into the copies of those substrates on the sub topologies.
1655        """
1656        segment_substrate = { }
1657        portals = { }
1658        for s in top.substrates:
1659            # tbs will contain an ip address on this subsrate that is in
1660            # each testbed.
1661            tbs = { }
1662            for i in s.interfaces:
1663                e = i.element
1664                tb = e.get_attribute('testbed')
1665                if tb and not tbs.has_key(tb):
1666                    for i in e.interface:
1667                        if s in i.subs:
1668                            tbs[tb]= i.get_attribute('ip4_address')
1669            if len(tbs) < 2:
1670                continue
1671
1672            # DRAGON will not create multi-site vlans yet
1673            if len(tbs) == 2 and \
1674                    all([tbparams[x].has_key('dragon') for x in tbs]):
1675                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1676                        master, eid)
1677            else:
1678                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1679                        eid, segment_substrate, portals)
1680
1681        # Make sure that all the slaves have a control portal back to the
1682        # master.
1683        for tb in [ t for t in tbparams.keys() if t != master ]:
1684            if len([e for e in topo[tb].elements \
1685                    if isinstance(e, topdl.Computer) and \
1686                    e.get_attribute('portal') and \
1687                    e.get_attribute('portal_type') == 'both']) == 0:
1688
1689                if tbparams[master].has_key('dragon') \
1690                        and tbparams[tb].has_key('dragon'):
1691
1692                    idx = len([x for x in topo.keys() \
1693                            if x.startswith('dragon')])
1694                    dip, leng = ip_allocator.allocate(4)
1695                    dip += 1
1696                    mip = dip+1
1697                    csub = topdl.Substrate(
1698                            name="dragon-control-%s" % tb,
1699                            capacity=topdl.Capacity(100000.0, 'max'),
1700                            attribute=[
1701                                topdl.Attribute(
1702                                    attribute='portal',
1703                                    value='true'
1704                                    )
1705                                ]
1706                            )
1707                    seg = topdl.Segment(
1708                            id= tbparams[master]['allocID'],
1709                            type='emulab',
1710                            uri = self.tbmap.get(master, None),
1711                            interface=[ 
1712                                topdl.Interface(
1713                                    substrate=csub.name),
1714                                ],
1715                            attribute = [
1716                                topdl.Attribute(attribute=n, value=v)
1717                                    for n, v in (\
1718                                        ('domain', 
1719                                            tbparams[master].get('domain',
1720                                                ".example.com")),
1721                                        ('experiment', "%s/%s" % \
1722                                                (tbparams[master].get(
1723                                                    'project', 
1724                                                    'project'), 
1725                                                    eid)),)
1726                                ],
1727                            )
1728                    topo[tb].substrates.append(csub)
1729                    topo[tb].elements.append(
1730                            self.new_dragon_portal(tb, master, master, eid, 
1731                                dip, mip, idx, csub, tbparams))
1732                    topo[tb].elements.append(seg)
1733
1734                    mcsub = csub.clone()
1735                    seg = topdl.Segment(
1736                            id= tbparams[tb]['allocID'],
1737                            type='emulab',
1738                            uri = self.tbmap.get(tb, None),
1739                            interface=[ 
1740                                topdl.Interface(
1741                                    substrate=csub.name),
1742                                ],
1743                            attribute = [
1744                                topdl.Attribute(attribute=n, value=v)
1745                                    for n, v in (\
1746                                        ('domain', 
1747                                            tbparams[tb].get('domain',
1748                                                ".example.com")),
1749                                        ('experiment', "%s/%s" % \
1750                                                (tbparams[tb].get('project', 
1751                                                    'project'), 
1752                                                    eid)),)
1753                                ],
1754                            )
1755                    topo[master].substrates.append(mcsub)
1756                    topo[master].elements.append(
1757                            self.new_dragon_portal(master, tb, master, eid, 
1758                                mip, dip, idx, mcsub, tbparams))
1759                    topo[master].elements.append(seg)
1760
1761                    self.create_dragon_substrate(csub, topo, 
1762                            {tb: 1, master:1}, tbparams, master, eid)
1763                else:
1764                    self.add_control_portal(master, tb, master, eid, topo, 
1765                            tbparams)
1766                    self.add_control_portal(tb, master, master, eid, topo, 
1767                            tbparams)
1768
1769        # Connect the portal nodes into the topologies and clear out
1770        # substrates that are not in the topologies
1771        for tb in tbparams.keys():
1772            topo[tb].incorporate_elements()
1773            topo[tb].substrates = \
1774                    [s for s in topo[tb].substrates \
1775                        if len(s.interfaces) >0]
1776
1777    def wrangle_software(self, expid, top, topo, tbparams):
1778        """
1779        Copy software out to the repository directory, allocate permissions and
1780        rewrite the segment topologies to look for the software in local
1781        places.
1782        """
1783
1784        # Copy the rpms and tarfiles to a distribution directory from
1785        # which the federants can retrieve them
1786        linkpath = "%s/software" %  expid
1787        softdir ="%s/%s" % ( self.repodir, linkpath)
1788        softmap = { }
1789        # These are in a list of tuples format (each kit).  This comprehension
1790        # unwraps them into a single list of tuples that initilaizes the set of
1791        # tuples.
1792        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1793                for p, t in l ])
1794        pkgs.update([x.location for e in top.elements \
1795                for x in e.software])
1796        try:
1797            os.makedirs(softdir)
1798        except IOError, e:
1799            raise service_error(
1800                    "Cannot create software directory: %s" % e)
1801        # The actual copying.  Everything's converted into a url for copying.
1802        for pkg in pkgs:
1803            loc = pkg
1804
1805            scheme, host, path = urlparse(loc)[0:3]
1806            dest = os.path.basename(path)
1807            if not scheme:
1808                if not loc.startswith('/'):
1809                    loc = "/%s" % loc
1810                loc = "file://%s" %loc
1811            try:
1812                u = urlopen(loc)
1813            except Exception, e:
1814                raise service_error(service_error.req, 
1815                        "Cannot open %s: %s" % (loc, e))
1816            try:
1817                f = open("%s/%s" % (softdir, dest) , "w")
1818                self.log.debug("Writing %s/%s" % (softdir,dest) )
1819                data = u.read(4096)
1820                while data:
1821                    f.write(data)
1822                    data = u.read(4096)
1823                f.close()
1824                u.close()
1825            except Exception, e:
1826                raise service_error(service_error.internal,
1827                        "Could not copy %s: %s" % (loc, e))
1828            path = re.sub("/tmp", "", linkpath)
1829            # XXX
1830            softmap[pkg] = \
1831                    "%s/%s/%s" %\
1832                    ( self.repo_url, path, dest)
1833
1834            # Allow the individual segments to access the software.
1835            for tb in tbparams.keys():
1836                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1837                        "/%s/%s" % ( path, dest))
1838
1839        # Convert the software locations in the segments into the local
1840        # copies on this host
1841        for soft in [ s for tb in topo.values() \
1842                for e in tb.elements \
1843                    if getattr(e, 'software', False) \
1844                        for s in e.software ]:
1845            if softmap.has_key(soft.location):
1846                soft.location = softmap[soft.location]
1847
1848
1849    def new_experiment(self, req, fid):
1850        """
1851        The external interface to empty initial experiment creation called from
1852        the dispatcher.
1853
1854        Creates a working directory, splits the incoming description using the
1855        splitter script and parses out the avrious subsections using the
1856        lcasses above.  Once each sub-experiment is created, use pooled threads
1857        to instantiate them and start it all up.
1858        """
1859        if not self.auth.check_attribute(fid, 'new'):
1860            raise service_error(service_error.access, "New access denied")
1861
1862        try:
1863            tmpdir = tempfile.mkdtemp(prefix="split-")
1864        except IOError:
1865            raise service_error(service_error.internal, "Cannot create tmp dir")
1866
1867        try:
1868            access_user = self.accessdb[fid]
1869        except KeyError:
1870            raise service_error(service_error.internal,
1871                    "Access map and authorizer out of sync in " + \
1872                            "new_experiment for fedid %s"  % fid)
1873
1874        pid = "dummy"
1875        gid = "dummy"
1876
1877        req = req.get('NewRequestBody', None)
1878        if not req:
1879            raise service_error(service_error.req,
1880                    "Bad request format (no NewRequestBody)")
1881
1882        # Generate an ID for the experiment (slice) and a certificate that the
1883        # allocator can use to prove they own it.  We'll ship it back through
1884        # the encrypted connection.
1885        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1886
1887        #now we're done with the tmpdir, and it should be empty
1888        if self.cleanup:
1889            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1890            os.rmdir(tmpdir)
1891        else:
1892            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1893
1894        eid = self.create_experiment_state(fid, req, expid, expcert, 
1895                state='empty')
1896
1897        # Let users touch the state
1898        self.auth.set_attribute(fid, expid)
1899        self.auth.set_attribute(expid, expid)
1900        # Override fedids can manipulate state as well
1901        for o in self.overrides:
1902            self.auth.set_attribute(o, expid)
1903
1904        rv = {
1905                'experimentID': [
1906                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1907                ],
1908                'experimentStatus': 'empty',
1909                'experimentAccess': { 'X509' : expcert }
1910            }
1911
1912        return rv
1913
1914
1915    def create_experiment(self, req, fid):
1916        """
1917        The external interface to experiment creation called from the
1918        dispatcher.
1919
1920        Creates a working directory, splits the incoming description using the
1921        splitter script and parses out the avrious subsections using the
1922        lcasses above.  Once each sub-experiment is created, use pooled threads
1923        to instantiate them and start it all up.
1924        """
1925
1926        req = req.get('CreateRequestBody', None)
1927        if not req:
1928            raise service_error(service_error.req,
1929                    "Bad request format (no CreateRequestBody)")
1930
1931        # Get the experiment access
1932        exp = req.get('experimentID', None)
1933        if exp:
1934            if exp.has_key('fedid'):
1935                key = exp['fedid']
1936                expid = key
1937                eid = None
1938            elif exp.has_key('localname'):
1939                key = exp['localname']
1940                eid = key
1941                expid = None
1942            else:
1943                raise service_error(service_error.req, "Unknown lookup type")
1944        else:
1945            raise service_error(service_error.req, "No request?")
1946
1947        self.check_experiment_access(fid, key)
1948
1949        try:
1950            tmpdir = tempfile.mkdtemp(prefix="split-")
1951            os.mkdir(tmpdir+"/keys")
1952        except IOError:
1953            raise service_error(service_error.internal, "Cannot create tmp dir")
1954
1955        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1956        gw_secretkey_base = "fed.%s" % self.ssh_type
1957        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1958        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1959        tclfile = tmpdir + "/experiment.tcl"
1960        tbparams = { }
1961        try:
1962            access_user = self.accessdb[fid]
1963        except KeyError:
1964            raise service_error(service_error.internal,
1965                    "Access map and authorizer out of sync in " + \
1966                            "create_experiment for fedid %s"  % fid)
1967
1968        pid = "dummy"
1969        gid = "dummy"
1970
1971        # The tcl parser needs to read a file so put the content into that file
1972        descr=req.get('experimentdescription', None)
1973        if descr:
1974            file_content=descr.get('ns2description', None)
1975            if file_content:
1976                try:
1977                    f = open(tclfile, 'w')
1978                    f.write(file_content)
1979                    f.close()
1980                except IOError:
1981                    raise service_error(service_error.internal,
1982                            "Cannot write temp experiment description")
1983            else:
1984                raise service_error(service_error.req, 
1985                        "Only ns2descriptions supported")
1986        else:
1987            raise service_error(service_error.req, "No experiment description")
1988
1989        self.state_lock.acquire()
1990        if self.state.has_key(key):
1991            for e in self.state[key].get('experimentID',[]):
1992                if not expid and e.has_key('fedid'):
1993                    expid = e['fedid']
1994                elif not eid and e.has_key('localname'):
1995                    eid = e['localname']
1996        self.state_lock.release()
1997
1998        if not (eid and expid):
1999            raise service_error(service_error.internal, 
2000                    "Cannot find local experiment info!?")
2001
2002        try: 
2003            # This catches exceptions to clear the placeholder if necessary
2004            try:
2005                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2006            except ValueError:
2007                raise service_error(service_error.server_config, 
2008                        "Bad key type (%s)" % self.ssh_type)
2009
2010            master = req.get('master', None)
2011            if not master:
2012                raise service_error(service_error.req,
2013                        "No master testbed label")
2014            export_project = req.get('exportProject', None)
2015            if not export_project:
2016                raise service_error(service_error.req, "No export project")
2017           
2018            # Translate to topdl
2019            if self.splitter_url:
2020                # XXX: need remote topdl translator
2021                self.log.debug("Calling remote splitter at %s" % \
2022                        self.splitter_url)
2023                split_data = self.remote_splitter(self.splitter_url,
2024                        file_content, master)
2025            else:
2026                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2027                    str(self.muxmax), '-m', master]
2028
2029                if self.fedkit:
2030                    tclcmd.append('-k')
2031
2032                if self.gatewaykit:
2033                    tclcmd.append('-K')
2034
2035                tclcmd.extend([pid, gid, eid, tclfile])
2036
2037                self.log.debug("running local splitter %s", " ".join(tclcmd))
2038                # This is just fantastic.  As a side effect the parser copies
2039                # tb_compat.tcl into the current directory, so that directory
2040                # must be writable by the fedd user.  Doing this in the
2041                # temporary subdir ensures this is the case.
2042                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2043                        cwd=tmpdir)
2044                split_data = tclparser.stdout
2045
2046            top = topdl.topology_from_xml(file=split_data, top="experiment")
2047
2048            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2049             # Find the testbeds to look up
2050            testbeds = set([ a.value for e in top.elements \
2051                    for a in e.attribute \
2052                    if a.attribute == 'testbed'] )
2053
2054            allocated = { }         # Testbeds we can access
2055            topo ={ }               # Sub topologies
2056            self.get_access_to_testbeds(testbeds, access_user, 
2057                    export_project, master, allocated, tbparams)
2058            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2059
2060            # Copy configuration files into the remote file store
2061            # The config urlpath
2062            configpath = "/%s/config" % expid
2063            # The config file system location
2064            configdir ="%s%s" % ( self.repodir, configpath)
2065            try:
2066                os.makedirs(configdir)
2067            except IOError, e:
2068                raise service_error(
2069                        "Cannot create config directory: %s" % e)
2070            try:
2071                f = open("%s/hosts" % configdir, "w")
2072                f.write('\n'.join(hosts))
2073                f.close()
2074            except IOError, e:
2075                raise service_error(service_error.internal, 
2076                        "Cannot write hosts file: %s" % e)
2077            try:
2078                copy_file("%s" % gw_pubkey, "%s/%s" % \
2079                        (configdir, gw_pubkey_base))
2080                copy_file("%s" % gw_secretkey, "%s/%s" % \
2081                        (configdir, gw_secretkey_base))
2082            except IOError, e:
2083                raise service_error(service_error.internal, 
2084                        "Cannot copy keyfiles: %s" % e)
2085
2086            # Allow the individual testbeds to access the configuration files.
2087            for tb in tbparams.keys():
2088                asignee = tbparams[tb]['allocID']['fedid']
2089                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2090                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2091
2092            self.add_portals(top, topo, eid, master, tbparams, ip_allocator)
2093            # Now get access to the dynamic testbeds
2094            for k, t in topo.items():
2095                if not t.get_attribute('dynamic'):
2096                    continue
2097                tb = t.get_attribute('testbed')
2098                if tb: 
2099                    self.get_access(tb, None, user, tbparams, master, 
2100                            export_project, access_user)
2101                    tbparams[k] = tbparams[tb]
2102                    del tbparams[tb]
2103                    allocated[k] = 1
2104                else:
2105                    raise service_error(service_error.internal, 
2106                            "Dynamic allocation from no testbed!?")
2107
2108            self.wrangle_software(expid, top, topo, tbparams)
2109
2110            vtopo = topdl.topology_to_vtopo(top)
2111            vis = self.genviz(vtopo)
2112
2113            # save federant information
2114            for k in allocated.keys():
2115                tbparams[k]['federant'] = {
2116                        'name': [ { 'localname' : eid} ],
2117                        'allocID' : tbparams[k]['allocID'],
2118                        'master' : k == master,
2119                        'uri': tbparams[k]['uri'],
2120                    }
2121                if tbparams[k].has_key('emulab'):
2122                        tbparams[k]['federant']['emulab'] = \
2123                                tbparams[k]['emulab']
2124
2125            self.state_lock.acquire()
2126            self.state[eid]['vtopo'] = vtopo
2127            self.state[eid]['vis'] = vis
2128            self.state[expid]['federant'] = \
2129                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2130                        if tbparams[tb].has_key('federant') ]
2131            if self.state_filename: 
2132                self.write_state()
2133            self.state_lock.release()
2134        except service_error, e:
2135            # If something goes wrong in the parse (usually an access error)
2136            # clear the placeholder state.  From here on out the code delays
2137            # exceptions.  Failing at this point returns a fault to the remote
2138            # caller.
2139
2140            self.state_lock.acquire()
2141            del self.state[eid]
2142            del self.state[expid]
2143            if self.state_filename: self.write_state()
2144            self.state_lock.release()
2145            raise e
2146
2147
2148        # Start the background swapper and return the starting state.  From
2149        # here on out, the state will stick around a while.
2150
2151        # Let users touch the state
2152        self.auth.set_attribute(fid, expid)
2153        self.auth.set_attribute(expid, expid)
2154        # Override fedids can manipulate state as well
2155        for o in self.overrides:
2156            self.auth.set_attribute(o, expid)
2157
2158        # Create a logger that logs to the experiment's state object as well as
2159        # to the main log file.
2160        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2161        alloc_collector = self.list_log(self.state[eid]['log'])
2162        h = logging.StreamHandler(alloc_collector)
2163        # XXX: there should be a global one of these rather than repeating the
2164        # code.
2165        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2166                    '%d %b %y %H:%M:%S'))
2167        alloc_log.addHandler(h)
2168       
2169        attrs = [ 
2170                {
2171                    'attribute': 'ssh_pubkey', 
2172                    'value': '%s/%s/config/%s' % \
2173                            (self.repo_url, expid, gw_pubkey_base)
2174                },
2175                {
2176                    'attribute': 'ssh_secretkey', 
2177                    'value': '%s/%s/config/%s' % \
2178                            (self.repo_url, expid, gw_secretkey_base)
2179                },
2180                {
2181                    'attribute': 'hosts', 
2182                    'value': '%s/%s/config/hosts' % \
2183                            (self.repo_url, expid)
2184                },
2185                {
2186                    'attribute': 'experiment_name',
2187                    'value': eid,
2188                },
2189            ]
2190
2191        # Start a thread to do the resource allocation
2192        t  = Thread(target=self.allocate_resources,
2193                args=(allocated, master, eid, expid, tbparams, 
2194                    topo, tmpdir, alloc_log, alloc_collector, attrs),
2195                name=eid)
2196        t.start()
2197
2198        rv = {
2199                'experimentID': [
2200                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2201                ],
2202                'experimentStatus': 'starting',
2203            }
2204
2205        return rv
2206   
2207    def get_experiment_fedid(self, key):
2208        """
2209        find the fedid associated with the localname key in the state database.
2210        """
2211
2212        rv = None
2213        self.state_lock.acquire()
2214        if self.state.has_key(key):
2215            if isinstance(self.state[key], dict):
2216                try:
2217                    kl = [ f['fedid'] for f in \
2218                            self.state[key]['experimentID']\
2219                                if f.has_key('fedid') ]
2220                except KeyError:
2221                    self.state_lock.release()
2222                    raise service_error(service_error.internal, 
2223                            "No fedid for experiment %s when getting "+\
2224                                    "fedid(!?)" % key)
2225                if len(kl) == 1:
2226                    rv = kl[0]
2227                else:
2228                    self.state_lock.release()
2229                    raise service_error(service_error.internal, 
2230                            "multiple fedids for experiment %s when " +\
2231                                    "getting fedid(!?)" % key)
2232            else:
2233                self.state_lock.release()
2234                raise service_error(service_error.internal, 
2235                        "Unexpected state for %s" % key)
2236        self.state_lock.release()
2237        return rv
2238
2239    def check_experiment_access(self, fid, key):
2240        """
2241        Confirm that the fid has access to the experiment.  Though a request
2242        may be made in terms of a local name, the access attribute is always
2243        the experiment's fedid.
2244        """
2245        if not isinstance(key, fedid):
2246            key = self.get_experiment_fedid(key)
2247
2248        if self.auth.check_attribute(fid, key):
2249            return True
2250        else:
2251            raise service_error(service_error.access, "Access Denied")
2252
2253
2254    def get_handler(self, path, fid):
2255        self.log.info("Get handler %s %s" % (path, fid))
2256        if self.auth.check_attribute(fid, path):
2257            return ("%s/%s" % (self.repodir, path), "application/binary")
2258        else:
2259            return (None, None)
2260
2261    def get_vtopo(self, req, fid):
2262        """
2263        Return the stored virtual topology for this experiment
2264        """
2265        rv = None
2266        state = None
2267
2268        req = req.get('VtopoRequestBody', None)
2269        if not req:
2270            raise service_error(service_error.req,
2271                    "Bad request format (no VtopoRequestBody)")
2272        exp = req.get('experiment', None)
2273        if exp:
2274            if exp.has_key('fedid'):
2275                key = exp['fedid']
2276                keytype = "fedid"
2277            elif exp.has_key('localname'):
2278                key = exp['localname']
2279                keytype = "localname"
2280            else:
2281                raise service_error(service_error.req, "Unknown lookup type")
2282        else:
2283            raise service_error(service_error.req, "No request?")
2284
2285        self.check_experiment_access(fid, key)
2286
2287        self.state_lock.acquire()
2288        if self.state.has_key(key):
2289            if self.state[key].has_key('vtopo'):
2290                rv = { 'experiment' : {keytype: key },\
2291                        'vtopo': self.state[key]['vtopo'],\
2292                    }
2293            else:
2294                state = self.state[key]['experimentStatus']
2295        self.state_lock.release()
2296
2297        if rv: return rv
2298        else: 
2299            if state:
2300                raise service_error(service_error.partial, 
2301                        "Not ready: %s" % state)
2302            else:
2303                raise service_error(service_error.req, "No such experiment")
2304
2305    def get_vis(self, req, fid):
2306        """
2307        Return the stored visualization for this experiment
2308        """
2309        rv = None
2310        state = None
2311
2312        req = req.get('VisRequestBody', None)
2313        if not req:
2314            raise service_error(service_error.req,
2315                    "Bad request format (no VisRequestBody)")
2316        exp = req.get('experiment', None)
2317        if exp:
2318            if exp.has_key('fedid'):
2319                key = exp['fedid']
2320                keytype = "fedid"
2321            elif exp.has_key('localname'):
2322                key = exp['localname']
2323                keytype = "localname"
2324            else:
2325                raise service_error(service_error.req, "Unknown lookup type")
2326        else:
2327            raise service_error(service_error.req, "No request?")
2328
2329        self.check_experiment_access(fid, key)
2330
2331        self.state_lock.acquire()
2332        if self.state.has_key(key):
2333            if self.state[key].has_key('vis'):
2334                rv =  { 'experiment' : {keytype: key },\
2335                        'vis': self.state[key]['vis'],\
2336                        }
2337            else:
2338                state = self.state[key]['experimentStatus']
2339        self.state_lock.release()
2340
2341        if rv: return rv
2342        else:
2343            if state:
2344                raise service_error(service_error.partial, 
2345                        "Not ready: %s" % state)
2346            else:
2347                raise service_error(service_error.req, "No such experiment")
2348
2349    def clean_info_response(self, rv):
2350        """
2351        Remove the information in the experiment's state object that is not in
2352        the info response.
2353        """
2354        # Remove the owner info (should always be there, but...)
2355        if rv.has_key('owner'): del rv['owner']
2356
2357        # Convert the log into the allocationLog parameter and remove the
2358        # log entry (with defensive programming)
2359        if rv.has_key('log'):
2360            rv['allocationLog'] = "".join(rv['log'])
2361            del rv['log']
2362        else:
2363            rv['allocationLog'] = ""
2364
2365        if rv['experimentStatus'] != 'active':
2366            if rv.has_key('federant'): del rv['federant']
2367        else:
2368            # remove the allocationID and uri info from each federant
2369            for f in rv.get('federant', []):
2370                if f.has_key('allocID'): del f['allocID']
2371                if f.has_key('uri'): del f['uri']
2372        return rv
2373
2374    def get_info(self, req, fid):
2375        """
2376        Return all the stored info about this experiment
2377        """
2378        rv = None
2379
2380        req = req.get('InfoRequestBody', None)
2381        if not req:
2382            raise service_error(service_error.req,
2383                    "Bad request format (no InfoRequestBody)")
2384        exp = req.get('experiment', None)
2385        if exp:
2386            if exp.has_key('fedid'):
2387                key = exp['fedid']
2388                keytype = "fedid"
2389            elif exp.has_key('localname'):
2390                key = exp['localname']
2391                keytype = "localname"
2392            else:
2393                raise service_error(service_error.req, "Unknown lookup type")
2394        else:
2395            raise service_error(service_error.req, "No request?")
2396
2397        self.check_experiment_access(fid, key)
2398
2399        # The state may be massaged by the service function that called
2400        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2401        # state.
2402        self.state_lock.acquire()
2403        if self.state.has_key(key):
2404            rv = copy.deepcopy(self.state[key])
2405        self.state_lock.release()
2406
2407        if rv:
2408            return self.clean_info_response(rv)
2409        else:
2410            raise service_error(service_error.req, "No such experiment")
2411
2412    def get_multi_info(self, req, fid):
2413        """
2414        Return all the stored info that this fedid can access
2415        """
2416        rv = { 'info': [ ] }
2417
2418        self.state_lock.acquire()
2419        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2420            try:
2421                self.check_experiment_access(fid, key)
2422            except service_error, e:
2423                if e.code == service_error.access:
2424                    continue
2425                else:
2426                    self.state_lock.release()
2427                    raise e
2428
2429            if self.state.has_key(key):
2430                e = copy.deepcopy(self.state[key])
2431                e = self.clean_info_response(e)
2432                rv['info'].append(e)
2433        self.state_lock.release()
2434        return rv
2435
2436    def terminate_experiment(self, req, fid):
2437        """
2438        Swap this experiment out on the federants and delete the shared
2439        information
2440        """
2441        tbparams = { }
2442        req = req.get('TerminateRequestBody', None)
2443        if not req:
2444            raise service_error(service_error.req,
2445                    "Bad request format (no TerminateRequestBody)")
2446        force = req.get('force', False)
2447        exp = req.get('experiment', None)
2448        if exp:
2449            if exp.has_key('fedid'):
2450                key = exp['fedid']
2451                keytype = "fedid"
2452            elif exp.has_key('localname'):
2453                key = exp['localname']
2454                keytype = "localname"
2455            else:
2456                raise service_error(service_error.req, "Unknown lookup type")
2457        else:
2458            raise service_error(service_error.req, "No request?")
2459
2460        self.check_experiment_access(fid, key)
2461
2462        dealloc_list = [ ]
2463
2464
2465        # Create a logger that logs to the dealloc_list as well as to the main
2466        # log file.
2467        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2468        h = logging.StreamHandler(self.list_log(dealloc_list))
2469        # XXX: there should be a global one of these rather than repeating the
2470        # code.
2471        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2472                    '%d %b %y %H:%M:%S'))
2473        dealloc_log.addHandler(h)
2474
2475        self.state_lock.acquire()
2476        fed_exp = self.state.get(key, None)
2477
2478        if fed_exp:
2479            # This branch of the conditional holds the lock to generate a
2480            # consistent temporary tbparams variable to deallocate experiments.
2481            # It releases the lock to do the deallocations and reacquires it to
2482            # remove the experiment state when the termination is complete.
2483
2484            # First make sure that the experiment creation is complete.
2485            status = fed_exp.get('experimentStatus', None)
2486
2487            if status:
2488                if status in ('starting', 'terminating'):
2489                    if not force:
2490                        self.state_lock.release()
2491                        raise service_error(service_error.partial, 
2492                                'Experiment still being created or destroyed')
2493                    else:
2494                        self.log.warning('Experiment in %s state ' % status + \
2495                                'being terminated by force.')
2496            else:
2497                # No status??? trouble
2498                self.state_lock.release()
2499                raise service_error(service_error.internal,
2500                        "Experiment has no status!?")
2501
2502            ids = []
2503            #  experimentID is a list of dicts that are self-describing
2504            #  identifiers.  This finds all the fedids and localnames - the
2505            #  keys of self.state - and puts them into ids.
2506            for id in fed_exp.get('experimentID', []):
2507                if id.has_key('fedid'): ids.append(id['fedid'])
2508                if id.has_key('localname'): ids.append(id['localname'])
2509
2510            # Collect the allocation/segment ids into a dict keyed by the fedid
2511            # of the allocation (or a monotonically increasing integer) that
2512            # contains a tuple of uri, aid (which is a dict...)
2513            for i, fed in enumerate(fed_exp.get('federant', [])):
2514                try:
2515                    uri = fed['uri']
2516                    aid = fed['allocID']
2517                    k = fed['allocID'].get('fedid', i)
2518                except KeyError, e:
2519                    continue
2520                tbparams[k] = (uri, aid)
2521            fed_exp['experimentStatus'] = 'terminating'
2522            if self.state_filename: self.write_state()
2523            self.state_lock.release()
2524
2525            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2526            # then completes, so we can't wait if nothing starts.  So, no
2527            # tbparams, no start.
2528            if len(tbparams) > 0:
2529                thread_pool = self.thread_pool(self.nthreads)
2530                for k in tbparams.keys():
2531                    # Create and start a thread to stop the segment
2532                    thread_pool.wait_for_slot()
2533                    uri, aid = tbparams[k]
2534                    t  = self.pooled_thread(\
2535                            target=self.terminate_segment(log=dealloc_log,
2536                                testbed=uri,
2537                                cert_file=self.cert_file, 
2538                                cert_pwd=self.cert_pwd,
2539                                trusted_certs=self.trusted_certs,
2540                                caller=self.call_TerminateSegment),
2541                            args=(uri, aid), name=k,
2542                            pdata=thread_pool, trace_file=self.trace_file)
2543                    t.start()
2544                # Wait for completions
2545                thread_pool.wait_for_all_done()
2546
2547            # release the allocations (failed experiments have done this
2548            # already, and starting experiments may be in odd states, so we
2549            # ignore errors releasing those allocations
2550            try: 
2551                for k in tbparams.keys():
2552                    # This releases access by uri
2553                    uri, aid = tbparams[k]
2554                    self.release_access(None, aid, uri=uri)
2555            except service_error, e:
2556                if status != 'failed' and not force:
2557                    raise e
2558
2559            # Remove the terminated experiment
2560            self.state_lock.acquire()
2561            for id in ids:
2562                if self.state.has_key(id): del self.state[id]
2563
2564            if self.state_filename: self.write_state()
2565            self.state_lock.release()
2566
2567            return { 
2568                    'experiment': exp , 
2569                    'deallocationLog': "".join(dealloc_list),
2570                    }
2571        else:
2572            # Don't forget to release the lock
2573            self.state_lock.release()
2574            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.