source: fedd/federation/experiment_control.py @ 7183b48

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 7183b48 was 7183b48, checked in by Ted Faber <faber@…>, 14 years ago

Split out experiment creation into two parts

  • Property mode set to 100644
File size: 89.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221        self.repo_url = config.get("experiment_control", "repo_url", 
222                "https://users.isi.deterlab.net:23235");
223
224        self.exp_stem = "fed-stem"
225        self.log = logging.getLogger("fedd.experiment_control")
226        set_log_level(config, "experiment_control", self.log)
227        self.muxmax = 2
228        self.nthreads = 2
229        self.randomize_experiments = False
230
231        self.splitter = None
232        self.ssh_keygen = "/usr/bin/ssh-keygen"
233        self.ssh_identity_file = None
234
235
236        self.debug = config.getboolean("experiment_control", "create_debug")
237        self.cleanup = not config.getboolean("experiment_control", 
238                "leave_tmpfiles")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'New': soap_handler('New', self.new_experiment),
337                'Create': soap_handler('Create', self.create_experiment),
338                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
339                'Vis': soap_handler('Vis', self.get_vis),
340                'Info': soap_handler('Info', self.get_info),
341                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
342                'Terminate': soap_handler('Terminate', 
343                    self.terminate_experiment),
344        }
345
346        self.xmlrpc_services = {\
347                'New': xmlrpc_handler('New', self.new_experiment),
348                'Create': xmlrpc_handler('Create', self.create_experiment),
349                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
350                'Vis': xmlrpc_handler('Vis', self.get_vis),
351                'Info': xmlrpc_handler('Info', self.get_info),
352                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
353                'Terminate': xmlrpc_handler('Terminate',
354                    self.terminate_experiment),
355        }
356
357    # Call while holding self.state_lock
358    def write_state(self):
359        """
360        Write a new copy of experiment state after copying the existing state
361        to a backup.
362
363        State format is a simple pickling of the state dictionary.
364        """
365        if os.access(self.state_filename, os.W_OK):
366            copy_file(self.state_filename, \
367                    "%s.bak" % self.state_filename)
368        try:
369            f = open(self.state_filename, 'w')
370            pickle.dump(self.state, f)
371        except IOError, e:
372            self.log.error("Can't write file %s: %s" % \
373                    (self.state_filename, e))
374        except pickle.PicklingError, e:
375            self.log.error("Pickling problem: %s" % e)
376        except TypeError, e:
377            self.log.error("Pickling problem (TypeError): %s" % e)
378
379    # Call while holding self.state_lock
380    def read_state(self):
381        """
382        Read a new copy of experiment state.  Old state is overwritten.
383
384        State format is a simple pickling of the state dictionary.
385        """
386       
387        def get_experiment_id(state):
388            """
389            Pull the fedid experimentID out of the saved state.  This is kind
390            of a gross walk through the dict.
391            """
392
393            if state.has_key('experimentID'):
394                for e in state['experimentID']:
395                    if e.has_key('fedid'):
396                        return e['fedid']
397                else:
398                    return None
399            else:
400                return None
401
402        def get_alloc_ids(state):
403            """
404            Pull the fedids of the identifiers of each allocation from the
405            state.  Again, a dict dive that's best isolated.
406            """
407
408            return [ f['allocID']['fedid'] 
409                    for f in state.get('federant',[]) \
410                        if f.has_key('allocID') and \
411                            f['allocID'].has_key('fedid')]
412
413
414        try:
415            f = open(self.state_filename, "r")
416            self.state = pickle.load(f)
417            self.log.debug("[read_state]: Read state from %s" % \
418                    self.state_filename)
419        except IOError, e:
420            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
421                    % (self.state_filename, e))
422        except pickle.UnpicklingError, e:
423            self.log.warning(("[read_state]: No saved state: " + \
424                    "Unpickling failed: %s") % e)
425       
426        for s in self.state.values():
427            try:
428
429                eid = get_experiment_id(s)
430                if eid : 
431                    # Give the owner rights to the experiment
432                    self.auth.set_attribute(s['owner'], eid)
433                    # And holders of the eid as well
434                    self.auth.set_attribute(eid, eid)
435                    # allow overrides to control experiments as well
436                    for o in self.overrides:
437                        self.auth.set_attribute(o, eid)
438                    # Set permissions to allow reading of the software repo, if
439                    # any, as well.
440                    for a in get_alloc_ids(s):
441                        self.auth.set_attribute(a, 'repo/%s' % eid)
442                else:
443                    raise KeyError("No experiment id")
444            except KeyError, e:
445                self.log.warning("[read_state]: State ownership or identity " +\
446                        "misformatted in %s: %s" % (self.state_filename, e))
447
448
449    def read_accessdb(self, accessdb_file):
450        """
451        Read the mapping from fedids that can create experiments to their name
452        in the 3-level access namespace.  All will be asserted from this
453        testbed and can include the local username and porject that will be
454        asserted on their behalf by this fedd.  Each fedid is also added to the
455        authorization system with the "create" attribute.
456        """
457        self.accessdb = {}
458        # These are the regexps for parsing the db
459        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
460        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
461                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
462        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
463                "\s*->\s*(" + name_expr + ")\s*$")
464        lineno = 0
465
466        # Parse the mappings and store in self.authdb, a dict of
467        # fedid -> (proj, user)
468        try:
469            f = open(accessdb_file, "r")
470            for line in f:
471                lineno += 1
472                line = line.strip()
473                if len(line) == 0 or line.startswith('#'):
474                    continue
475                m = project_line.match(line)
476                if m:
477                    fid = fedid(hexstr=m.group(1))
478                    project, user = m.group(2,3)
479                    if not self.accessdb.has_key(fid):
480                        self.accessdb[fid] = []
481                    self.accessdb[fid].append((project, user))
482                    continue
483
484                m = user_line.match(line)
485                if m:
486                    fid = fedid(hexstr=m.group(1))
487                    project = None
488                    user = m.group(2)
489                    if not self.accessdb.has_key(fid):
490                        self.accessdb[fid] = []
491                    self.accessdb[fid].append((project, user))
492                    continue
493                self.log.warn("[experiment_control] Error parsing access " +\
494                        "db %s at line %d" %  (accessdb_file, lineno))
495        except IOError:
496            raise service_error(service_error.internal,
497                    "Error opening/reading %s as experiment " +\
498                            "control accessdb" %  accessdb_file)
499        f.close()
500
501        # Initialize the authorization attributes
502        for fid in self.accessdb.keys():
503            self.auth.set_attribute(fid, 'create')
504            self.auth.set_attribute(fid, 'new')
505
506    def read_mapdb(self, file):
507        """
508        Read a simple colon separated list of mappings for the
509        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
510        """
511
512        self.tbmap = { }
513        lineno =0
514        try:
515            f = open(file, "r")
516            for line in f:
517                lineno += 1
518                line = line.strip()
519                if line.startswith('#') or len(line) == 0:
520                    continue
521                try:
522                    label, url = line.split(':', 1)
523                    self.tbmap[label] = url
524                except ValueError, e:
525                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
526                            "map db: %s %s" % (lineno, line, e))
527        except IOError, e:
528            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
529                    "open %s: %s" % (file, e))
530        f.close()
531       
532    def generate_ssh_keys(self, dest, type="rsa" ):
533        """
534        Generate a set of keys for the gateways to use to talk.
535
536        Keys are of type type and are stored in the required dest file.
537        """
538        valid_types = ("rsa", "dsa")
539        t = type.lower();
540        if t not in valid_types: raise ValueError
541        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
542
543        try:
544            trace = open("/dev/null", "w")
545        except IOError:
546            raise service_error(service_error.internal,
547                    "Cannot open /dev/null??");
548
549        # May raise CalledProcessError
550        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
551        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
552        if rv != 0:
553            raise service_error(service_error.internal, 
554                    "Cannot generate nonce ssh keys.  %s return code %d" \
555                            % (self.ssh_keygen, rv))
556
557    def gentopo(self, str):
558        """
559        Generate the topology dtat structure from the splitter's XML
560        representation of it.
561
562        The topology XML looks like:
563            <experiment>
564                <nodes>
565                    <node><vname></vname><ips>ip1:ip2</ips></node>
566                </nodes>
567                <lans>
568                    <lan>
569                        <vname></vname><vnode></vnode><ip></ip>
570                        <bandwidth></bandwidth><member>node:port</member>
571                    </lan>
572                </lans>
573        """
574        class topo_parse:
575            """
576            Parse the topology XML and create the dats structure.
577            """
578            def __init__(self):
579                # Typing of the subelements for data conversion
580                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
581                self.int_subelements = ( 'bandwidth',)
582                self.float_subelements = ( 'delay',)
583                # The final data structure
584                self.nodes = [ ]
585                self.lans =  [ ]
586                self.topo = { \
587                        'node': self.nodes,\
588                        'lan' : self.lans,\
589                    }
590                self.element = { }  # Current element being created
591                self.chars = ""     # Last text seen
592
593            def end_element(self, name):
594                # After each sub element the contents is added to the current
595                # element or to the appropriate list.
596                if name == 'node':
597                    self.nodes.append(self.element)
598                    self.element = { }
599                elif name == 'lan':
600                    self.lans.append(self.element)
601                    self.element = { }
602                elif name in self.str_subelements:
603                    self.element[name] = self.chars
604                    self.chars = ""
605                elif name in self.int_subelements:
606                    self.element[name] = int(self.chars)
607                    self.chars = ""
608                elif name in self.float_subelements:
609                    self.element[name] = float(self.chars)
610                    self.chars = ""
611
612            def found_chars(self, data):
613                self.chars += data.rstrip()
614
615
616        tp = topo_parse();
617        parser = xml.parsers.expat.ParserCreate()
618        parser.EndElementHandler = tp.end_element
619        parser.CharacterDataHandler = tp.found_chars
620
621        parser.Parse(str)
622
623        return tp.topo
624       
625
626    def genviz(self, topo):
627        """
628        Generate the visualization the virtual topology
629        """
630
631        neato = "/usr/local/bin/neato"
632        # These are used to parse neato output and to create the visualization
633        # file.
634        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
635        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
636                "%s</type></node>"
637
638        try:
639            # Node names
640            nodes = [ n['vname'] for n in topo['node'] ]
641            topo_lans = topo['lan']
642        except KeyError, e:
643            raise service_error(service_error.internal, "Bad topology: %s" %e)
644
645        lans = { }
646        links = { }
647
648        # Walk through the virtual topology, organizing the connections into
649        # 2-node connections (links) and more-than-2-node connections (lans).
650        # When a lan is created, it's added to the list of nodes (there's a
651        # node in the visualization for the lan).
652        for l in topo_lans:
653            if links.has_key(l['vname']):
654                if len(links[l['vname']]) < 2:
655                    links[l['vname']].append(l['vnode'])
656                else:
657                    nodes.append(l['vname'])
658                    lans[l['vname']] = links[l['vname']]
659                    del links[l['vname']]
660                    lans[l['vname']].append(l['vnode'])
661            elif lans.has_key(l['vname']):
662                lans[l['vname']].append(l['vnode'])
663            else:
664                links[l['vname']] = [ l['vnode'] ]
665
666
667        # Open up a temporary file for dot to turn into a visualization
668        try:
669            df, dotname = tempfile.mkstemp()
670            dotfile = os.fdopen(df, 'w')
671        except IOError:
672            raise service_error(service_error.internal,
673                    "Failed to open file in genviz")
674
675        try:
676            dnull = open('/dev/null', 'w')
677        except IOError:
678            service_error(service_error.internal,
679                    "Failed to open /dev/null in genviz")
680
681        # Generate a dot/neato input file from the links, nodes and lans
682        try:
683            print >>dotfile, "graph G {"
684            for n in nodes:
685                print >>dotfile, '\t"%s"' % n
686            for l in links.keys():
687                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
688            for l in lans.keys():
689                for n in lans[l]:
690                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
691            print >>dotfile, "}"
692            dotfile.close()
693        except TypeError:
694            raise service_error(service_error.internal,
695                    "Single endpoint link in vtopo")
696        except IOError:
697            raise service_error(service_error.internal, "Cannot write dot file")
698
699        # Use dot to create a visualization
700        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
701                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
702                close_fds=True)
703        dnull.close()
704
705        # Translate dot to vis format
706        vis_nodes = [ ]
707        vis = { 'node': vis_nodes }
708        for line in dot.stdout:
709            m = vis_re.match(line)
710            if m:
711                vn = m.group(1)
712                vis_node = {'name': vn, \
713                        'x': float(m.group(2)),\
714                        'y' : float(m.group(3)),\
715                    }
716                if vn in links.keys() or vn in lans.keys():
717                    vis_node['type'] = 'lan'
718                else:
719                    vis_node['type'] = 'node'
720                vis_nodes.append(vis_node)
721        rv = dot.wait()
722
723        os.remove(dotname)
724        if rv == 0 : return vis
725        else: return None
726
727    def get_access(self, tb, nodes, user, tbparam, master, export_project,
728            access_user):
729        """
730        Get access to testbed through fedd and set the parameters for that tb
731        """
732        uri = self.tbmap.get(tb, None)
733        if not uri:
734            raise service_error(serice_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        # currently this lumps all users into one service access group
738        service_keys = [ a for u in user \
739                for a in u.get('access', []) \
740                    if a.has_key('sshPubkey')]
741
742        if len(service_keys) == 0:
743            raise service_error(service_error.req, 
744                    "Must have at least one SSH pubkey for services")
745
746        # Tweak search order so that if there are entries in access_user that
747        # have a project matching the export project, we try them first
748        if export_project and export_project.has_key('localname'): 
749            pn = export_project['localname'] 
750               
751            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
752            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
753        else: 
754            access_sequence = access_user
755
756        for p, u in access_sequence: 
757            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
758                    "to %s") %  ((p or "None"), u, uri))
759
760            if p:
761                # Request with user and project specified
762                req = {\
763                        'destinationTestbed' : { 'uri' : uri },
764                        'project': { 
765                            'name': {'localname': p},
766                            'user': [ {'userID': { 'localname': u } } ],
767                            },
768                        'user':  user,
769                        'allocID' : { 'localname': 'test' },
770                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
771                        'serviceAccess' : service_keys
772                    }
773            else:
774                # Request with only user specified
775                req = {\
776                        'destinationTestbed' : { 'uri' : uri },
777                        'user':  [ {'userID': { 'localname': u } } ],
778                        'allocID' : { 'localname': 'test' },
779                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
780                        'serviceAccess' : service_keys
781                    }
782
783            if tb == master:
784                # NB, the export_project parameter is a dict that includes
785                # the type
786                req['exportProject'] = export_project
787
788            # node resources if any
789            if nodes != None and len(nodes) > 0:
790                rnodes = [ ]
791                for n in nodes:
792                    rn = { }
793                    image, hw, count = n.split(":")
794                    if image: rn['image'] = [ image ]
795                    if hw: rn['hardware'] = [ hw ]
796                    if count and int(count) >0 : rn['count'] = int(count)
797                    rnodes.append(rn)
798                req['resources']= { }
799                req['resources']['node'] = rnodes
800
801            try:
802                if self.local_access.has_key(uri):
803                    # Local access call
804                    req = { 'RequestAccessRequestBody' : req }
805                    r = self.local_access[uri].RequestAccess(req, 
806                            fedid(file=self.cert_file))
807                    r = { 'RequestAccessResponseBody' : r }
808                else:
809                    r = self.call_RequestAccess(uri, req, 
810                            self.cert_file, self.cert_pwd, self.trusted_certs)
811            except service_error, e:
812                if e.code == service_error.access:
813                    self.log.debug("[get_access] Access denied")
814                    r = None
815                    continue
816                else:
817                    raise e
818
819            if r.has_key('RequestAccessResponseBody'):
820                # Through to here we have a valid response, not a fault.
821                # Access denied is a fault, so something better or worse than
822                # access denied has happened.
823                r = r['RequestAccessResponseBody']
824                self.log.debug("[get_access] Access granted")
825                break
826            else:
827                raise service_error(service_error.protocol,
828                        "Bad proxy response")
829       
830        if not r:
831            raise service_error(service_error.access, 
832                    "Access denied by %s (%s)" % (tb, uri))
833
834        if r.has_key('emulab'):
835            e = r['emulab']
836            p = e['project']
837            tbparam[tb] = { 
838                    "boss": e['boss'],
839                    "host": e['ops'],
840                    "domain": e['domain'],
841                    "fs": e['fileServer'],
842                    "eventserver": e['eventServer'],
843                    "project": unpack_id(p['name']),
844                    "emulab" : e,
845                    "allocID" : r['allocID'],
846                    "uri": uri,
847                    }
848            # Make the testbed name be the label the user applied
849            p['testbed'] = {'localname': tb }
850
851            for u in p['user']:
852                role = u.get('role', None)
853                if role == 'experimentCreation':
854                    tbparam[tb]['user'] = unpack_id(u['userID'])
855                    break
856            else:
857                raise service_error(service_error.internal, 
858                        "No createExperimentUser from %s" %tb)
859            # Add attributes to parameter space.  We don't allow attributes to
860            # overlay any parameters already installed.
861            for a in e['fedAttr']:
862                try:
863                    if a['attribute'] and \
864                            isinstance(a['attribute'], basestring)\
865                            and not tbparam[tb].has_key(a['attribute'].lower()):
866                        tbparam[tb][a['attribute'].lower()] = a['value']
867                except KeyError:
868                    self.log.error("Bad attribute in response: %s" % a)
869        else:
870            tbparam[tb] = { 
871                "allocID" : r['allocID'],
872                "uri": uri,
873            }
874
875    def release_access(self, tb, aid, uri=None):
876        """
877        Release access to testbed through fedd
878        """
879
880        if not uri:
881            uri = self.tbmap.get(tb, None)
882        if not uri:
883            raise service_error(service_error.server_config, 
884                    "Unknown testbed: %s" % tb)
885
886        if self.local_access.has_key(uri):
887            resp = self.local_access[uri].ReleaseAccess(\
888                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
889                    fedid(file=self.cert_file))
890            resp = { 'ReleaseAccessResponseBody': resp } 
891        else:
892            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
893                    self.cert_file, self.cert_pwd, self.trusted_certs)
894
895        # better error coding
896
897    def remote_splitter(self, uri, desc, master):
898
899        req = {
900                'description' : { 'ns2description': desc },
901                'master': master,
902                'include_fedkit': bool(self.fedkit),
903                'include_gatewaykit': bool(self.gatewaykit)
904            }
905
906        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
907                self.trusted_certs)
908
909        if r.has_key('Ns2SplitResponseBody'):
910            r = r['Ns2SplitResponseBody']
911            if r.has_key('output'):
912                return r['output'].splitlines()
913            else:
914                raise service_error(service_error.protocol, 
915                        "Bad splitter response (no output)")
916        else:
917            raise service_error(service_error.protocol, "Bad splitter response")
918
919    class start_segment:
920        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
921                cert_pwd=None, trusted_certs=None, caller=None,
922                log_collector=None):
923            self.log = log
924            self.debug = debug
925            self.cert_file = cert_file
926            self.cert_pwd = cert_pwd
927            self.trusted_certs = None
928            self.caller = caller
929            self.testbed = testbed
930            self.log_collector = log_collector
931            self.response = None
932
933        def __call__(self, uri, aid, topo, master, attrs=None):
934            req = {
935                    'allocID': { 'fedid' : aid }, 
936                    'segmentdescription': { 
937                        'topdldescription': topo.to_dict(),
938                    },
939                    'master': master,
940                }
941            if attrs:
942                req['fedAttr'] = attrs
943
944            try:
945                self.log.debug("Calling StartSegment at %s " % uri)
946                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
947                        self.trusted_certs)
948                if r.has_key('StartSegmentResponseBody'):
949                    lval = r['StartSegmentResponseBody'].get('allocationLog',
950                            None)
951                    if lval and self.log_collector:
952                        for line in  lval.splitlines(True):
953                            self.log_collector.write(line)
954                    self.response = r
955                else:
956                    raise service_error(service_error.internal, 
957                            "Bad response!?: %s" %r)
958                return True
959            except service_error, e:
960                self.log.error("Start segment failed on %s: %s" % \
961                        (self.testbed, e))
962                return False
963
964
965
966    class terminate_segment:
967        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
968                cert_pwd=None, trusted_certs=None, caller=None):
969            self.log = log
970            self.debug = debug
971            self.cert_file = cert_file
972            self.cert_pwd = cert_pwd
973            self.trusted_certs = None
974            self.caller = caller
975            self.testbed = testbed
976
977        def __call__(self, uri, aid ):
978            req = {
979                    'allocID': aid , 
980                }
981            try:
982                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
983                        self.trusted_certs)
984                return True
985            except service_error, e:
986                self.log.error("Terminate segment failed on %s: %s" % \
987                        (self.testbed, e))
988                return False
989   
990
991    def allocate_resources(self, allocated, master, eid, expid, 
992            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
993            attrs=None):
994        def get_vlan(r):
995            if r.has_key('StartSegmentResponseBody'):
996                srb = r['StartSegmentResponseBody']
997                if srb.has_key('fedAttr'):
998                    for k, v in [ (a['attribute'], a['value']) \
999                            for a in srb['fedAttr']]:
1000                        if k == 'vlan': return v
1001            return None
1002
1003        started = { }           # Testbeds where a sub-experiment started
1004                                # successfully
1005
1006        # XXX
1007        fail_soft = False
1008
1009        slaves = [ k for k in allocated.keys() \
1010                if k != master and not topo[k].get_attribute('transit')]
1011        transit = [ k for k in allocated.keys() \
1012                if topo[k].get_attribute('transit')]
1013
1014        log = alloc_log or self.log
1015
1016        thread_pool = self.thread_pool(self.nthreads)
1017        threads = [ ]
1018
1019        for tb in transit:
1020            uri = tbparams[tb]['uri']
1021            if tbparams[tb].has_key('allocID') and \
1022                    tbparams[tb]['allocID'].has_key('fedid'):
1023                aid = tbparams[tb]['allocID']['fedid']
1024            else:
1025                raise service_error(service_error.internal, 
1026                        "No alloc id for testbed %s !?" % tb)
1027
1028            m = re.search('(\d+)', tb)
1029            if m:
1030                to_repl = "unassigned%s" % m.group(1)
1031            else:
1032                raise service_error(service_error.internal, 
1033                        "Bad dynamic allocation name")
1034                break
1035
1036            ss = self.start_segment(log=log, debug=self.debug, 
1037                testbed=tb, cert_file=self.cert_file, 
1038                cert_pwd=self.cert_pwd, 
1039                trusted_certs=self.trusted_certs,
1040                caller=self.call_StartSegment,
1041                log_collector=log_collector)
1042            t = self.pooled_thread(
1043                    target=ss,
1044                    args =(uri, aid, topo[tb], False, attrs),
1045                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1046            threads.append(t)
1047            t.start()
1048            # Wait until the this transit node finishes (keep pinging the log,
1049            # though)
1050
1051            mins = 0
1052            while not thread_pool.wait_for_all_done(60.0):
1053                mins += 1
1054                alloc_log.info("Waiting for transit (it has been %d mins)" \
1055                        % mins)
1056
1057            if t.rv:
1058                vlan = get_vlan(ss.response)
1059                if vlan is not None:
1060                    for k, t in topo.items():
1061                        for e in t.elements:
1062                            for i in e.interface:
1063                                vl = i.get_attribute('dragon_vlan')
1064                                if vl is not None and vl == to_repl:
1065                                    i.set_attribute('dragon_vlan', vlan)
1066            else:
1067                break
1068            thread_pool.clear()
1069
1070
1071        failed = [ t.getName() for t in threads if not t.rv ]
1072
1073        if len(failed) == 0:
1074            for tb in slaves:
1075                # Create and start a thread to start the segment, and save it
1076                # to get the return value later
1077                thread_pool.wait_for_slot()
1078                uri = self.tbmap.get(tb, None)
1079                if not uri:
1080                    raise service_error(service_error.internal, 
1081                            "Unknown testbed %s !?" % tb)
1082
1083                if tbparams[tb].has_key('allocID') and \
1084                        tbparams[tb]['allocID'].has_key('fedid'):
1085                    aid = tbparams[tb]['allocID']['fedid']
1086                else:
1087                    raise service_error(service_error.internal, 
1088                            "No alloc id for testbed %s !?" % tb)
1089
1090                t  = self.pooled_thread(\
1091                        target=self.start_segment(log=log, debug=self.debug,
1092                            testbed=tb, cert_file=self.cert_file,
1093                            cert_pwd=self.cert_pwd,
1094                            trusted_certs=self.trusted_certs,
1095                            caller=self.call_StartSegment,
1096                            log_collector=log_collector), 
1097                        args=(uri, aid, topo[tb], False, attrs), name=tb,
1098                        pdata=thread_pool, trace_file=self.trace_file)
1099                threads.append(t)
1100                t.start()
1101
1102            # Wait until all finish (keep pinging the log, though)
1103            mins = 0
1104            while not thread_pool.wait_for_all_done(60.0):
1105                mins += 1
1106                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1107                        % mins)
1108
1109            thread_pool.clear()
1110
1111        # If none failed, start the master
1112        failed = [ t.getName() for t in threads if not t.rv ]
1113
1114        if len(failed) == 0:
1115            uri = self.tbmap.get(master, None)
1116            if not uri:
1117                raise service_error(service_error.internal, 
1118                        "Unknown testbed %s !?" % master)
1119
1120            if tbparams[master].has_key('allocID') and \
1121                    tbparams[master]['allocID'].has_key('fedid'):
1122                aid = tbparams[master]['allocID']['fedid']
1123            else:
1124                raise service_error(service_error.internal, 
1125                    "No alloc id for testbed %s !?" % master)
1126            t = self.pooled_thread(
1127                    target=self.start_segment(log=log, debug=self.debug, 
1128                        testbed=master, cert_file=self.cert_file, 
1129                        cert_pwd=self.cert_pwd, 
1130                        trusted_certs=self.trusted_certs,
1131                        caller=self.call_StartSegment,
1132                        log_collector=log_collector),
1133                    args =(uri, aid, topo[master], True, attrs),
1134                    name=master, pdata=thread_pool, trace_file=self.trace_file)
1135            threads.append(t)
1136            t.start()
1137            # Wait until the master finishes (keep pinging the log, though)
1138            mins = 0
1139            while not thread_pool.wait_for_all_done(60.0):
1140                mins += 1
1141                alloc_log.info("Waiting for master (it has been %d mins)" \
1142                        % mins)
1143            # update failed to include the master, if it failed
1144            failed = [ t.getName() for t in threads if not t.rv ]
1145
1146        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1147        # If one failed clean up, unless fail_soft is set
1148        if failed:
1149            if not fail_soft:
1150                thread_pool.clear()
1151                for tb in succeeded:
1152                    # Create and start a thread to stop the segment
1153                    thread_pool.wait_for_slot()
1154                    uri = tbparams[tb]['uri']
1155                    t  = self.pooled_thread(\
1156                            target=self.terminate_segment(log=log,
1157                                testbed=tb,
1158                                cert_file=self.cert_file, 
1159                                cert_pwd=self.cert_pwd,
1160                                trusted_certs=self.trusted_certs,
1161                                caller=self.call_TerminateSegment),
1162                            args=(uri, tbparams[tb]['federant']['allocID']),
1163                            name=tb,
1164                            pdata=thread_pool, trace_file=self.trace_file)
1165                    t.start()
1166                # Wait until all finish
1167                thread_pool.wait_for_all_done()
1168
1169                # release the allocations
1170                for tb in tbparams.keys():
1171                    self.release_access(tb, tbparams[tb]['allocID'],
1172                            tbparams[tb].get('uri', None))
1173                # Remove the placeholder
1174                self.state_lock.acquire()
1175                self.state[eid]['experimentStatus'] = 'failed'
1176                if self.state_filename: self.write_state()
1177                self.state_lock.release()
1178
1179                log.error("Swap in failed on %s" % ",".join(failed))
1180                return
1181        else:
1182            log.info("[start_segment]: Experiment %s active" % eid)
1183
1184
1185        # Walk up tmpdir, deleting as we go
1186        if self.cleanup:
1187            log.debug("[start_experiment]: removing %s" % tmpdir)
1188            for path, dirs, files in os.walk(tmpdir, topdown=False):
1189                for f in files:
1190                    os.remove(os.path.join(path, f))
1191                for d in dirs:
1192                    os.rmdir(os.path.join(path, d))
1193            os.rmdir(tmpdir)
1194        else:
1195            log.debug("[start_experiment]: not removing %s" % tmpdir)
1196
1197        # Insert the experiment into our state and update the disk copy
1198        self.state_lock.acquire()
1199        self.state[expid]['experimentStatus'] = 'active'
1200        self.state[eid] = self.state[expid]
1201        if self.state_filename: self.write_state()
1202        self.state_lock.release()
1203        return
1204
1205
1206    def add_kit(self, e, kit):
1207        """
1208        Add a Software object created from the list of (install, location)
1209        tuples passed as kit  to the software attribute of an object e.  We
1210        do this enough to break out the code, but it's kind of a hack to
1211        avoid changing the old tuple rep.
1212        """
1213
1214        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1215
1216        if isinstance(e.software, list): e.software.extend(s)
1217        else: e.software = s
1218
1219
1220    def create_experiment_state(self, fid, req, expid, expcert, 
1221            state='starting'):
1222        """
1223        Create the initial entry in the experiment's state.  The expid and
1224        expcert are the experiment's fedid and certifacte that represents that
1225        ID, which are installed in the experiment state.  If the request
1226        includes a suggested local name that is used if possible.  If the local
1227        name is already taken by an experiment owned by this user that has
1228        failed, it is overwritten.  Otherwise new letters are added until a
1229        valid localname is found.  The generated local name is returned.
1230        """
1231
1232        if req.has_key('experimentID') and \
1233                req['experimentID'].has_key('localname'):
1234            overwrite = False
1235            eid = req['experimentID']['localname']
1236            # If there's an old failed experiment here with the same local name
1237            # and accessible by this user, we'll overwrite it, otherwise we'll
1238            # fall through and do the collision avoidance.
1239            old_expid = self.get_experiment_fedid(eid)
1240            if old_expid and self.check_experiment_access(fid, old_expid):
1241                self.state_lock.acquire()
1242                status = self.state[eid].get('experimentStatus', None)
1243                if status and status == 'failed':
1244                    # remove the old access attribute
1245                    self.auth.unset_attribute(fid, old_expid)
1246                    overwrite = True
1247                    del self.state[eid]
1248                    del self.state[old_expid]
1249                self.state_lock.release()
1250            self.state_lock.acquire()
1251            while (self.state.has_key(eid) and not overwrite):
1252                eid += random.choice(string.ascii_letters)
1253            # Initial state
1254            self.state[eid] = {
1255                    'experimentID' : \
1256                            [ { 'localname' : eid }, {'fedid': expid } ],
1257                    'experimentStatus': state,
1258                    'experimentAccess': { 'X509' : expcert },
1259                    'owner': fid,
1260                    'log' : [],
1261                }
1262            self.state[expid] = self.state[eid]
1263            if self.state_filename: self.write_state()
1264            self.state_lock.release()
1265        else:
1266            eid = self.exp_stem
1267            for i in range(0,5):
1268                eid += random.choice(string.ascii_letters)
1269            self.state_lock.acquire()
1270            while (self.state.has_key(eid)):
1271                eid = self.exp_stem
1272                for i in range(0,5):
1273                    eid += random.choice(string.ascii_letters)
1274            # Initial state
1275            self.state[eid] = {
1276                    'experimentID' : \
1277                            [ { 'localname' : eid }, {'fedid': expid } ],
1278                    'experimentStatus': state,
1279                    'experimentAccess': { 'X509' : expcert },
1280                    'owner': fid,
1281                    'log' : [],
1282                }
1283            self.state[expid] = self.state[eid]
1284            if self.state_filename: self.write_state()
1285            self.state_lock.release()
1286
1287        return eid
1288
1289
1290    def allocate_ips_to_topo(self, top):
1291        """
1292        Add an ip4_address attribute to all the hosts in the topology, based on
1293        the shared substrates on which they sit.  An /etc/hosts file is also
1294        created and returned as a list of hostfiles entries.  We also return
1295        the allocator, because we may need to allocate IPs to portals
1296        (specifically DRAGON portals).
1297        """
1298        subs = sorted(top.substrates, 
1299                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1300                reverse=True)
1301        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1302        ifs = { }
1303        hosts = [ ]
1304
1305        for idx, s in enumerate(subs):
1306            a = ips.allocate(len(s.interfaces)+2)
1307            if a :
1308                base, num = a
1309                if num < len(s.interfaces) +2 : 
1310                    raise service_error(service_error.internal,
1311                            "Allocator returned wrong number of IPs??")
1312            else:
1313                raise service_error(service_error.req, 
1314                        "Cannot allocate IP addresses")
1315
1316            base += 1
1317            for i in s.interfaces:
1318                i.attribute.append(
1319                        topdl.Attribute('ip4_address', 
1320                            "%s" % ip_addr(base)))
1321                hname = i.element.name[0]
1322                if ifs.has_key(hname):
1323                    hosts.append("%s\t%s-%s %s-%d" % \
1324                            (ip_addr(base), hname, s.name, hname,
1325                                ifs[hname]))
1326                else:
1327                    ifs[hname] = 0
1328                    hosts.append("%s\t%s-%s %s-%d %s" % \
1329                            (ip_addr(base), hname, s.name, hname,
1330                                ifs[hname], hname))
1331
1332                ifs[hname] += 1
1333                base += 1
1334        return hosts, ips
1335
1336    def get_access_to_testbeds(self, testbeds, user, access_user, 
1337            export_project, master, allocated, tbparams):
1338        """
1339        Request access to the various testbeds required for this instantiation
1340        (passed in as testbeds).  User, access_user, expoert_project and master
1341        are used to construct the correct requests.  Per-testbed parameters are
1342        returned in tbparams.
1343        """
1344        for tb in testbeds:
1345            self.get_access(tb, None, user, tbparams, master,
1346                    export_project, access_user)
1347            allocated[tb] = 1
1348
1349    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1350        """
1351        Create the sub-topologies that are needed for experimetn instantiation.
1352        Along the way attach startup commands to the computers in the
1353        subtopologies.
1354        """
1355        for tb in testbeds:
1356            topo[tb] = top.clone()
1357            to_delete = [ ]
1358            for e in topo[tb].elements:
1359                etb = e.get_attribute('testbed')
1360                if etb and etb != tb:
1361                    for i in e.interface:
1362                        for s in i.subs:
1363                            try:
1364                                s.interfaces.remove(i)
1365                            except ValueError:
1366                                raise service_error(service_error.internal,
1367                                        "Can't remove interface??")
1368                    to_delete.append(e)
1369            for e in to_delete:
1370                topo[tb].elements.remove(e)
1371            topo[tb].make_indices()
1372
1373            for e in [ e for e in topo[tb].elements \
1374                    if isinstance(e,topdl.Computer)]:
1375                if tb == master:
1376                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1377                else:
1378                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1379                scmd = e.get_attribute('startup')
1380                if scmd:
1381                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1382
1383                e.set_attribute('startup', cmd)
1384                if self.fedkit: self.add_kit(e, self.fedkit)
1385
1386    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1387            portal_type, iface_desc=()):
1388        sproject = tbparams[st].get('project', 'project')
1389        dproject = tbparams[dt].get('project', 'project')
1390        mproject = tbparams[master].get('project', 'project')
1391        sdomain = tbparams[st].get('domain', ".example.com")
1392        ddomain = tbparams[dt].get('domain', ".example.com")
1393        mdomain = tbparams[master].get('domain', '.example.com')
1394        muser = tbparams[master].get('user', 'root')
1395        smbshare = tbparams[master].get('smbshare', 'USERS')
1396        aid = tbparams[dt]['allocID']['fedid']
1397        if st == master or dt == master:
1398            active = ("%s" % (st == master))
1399        else:
1400            active = ("%s" %(st > dt))
1401
1402        ifaces = [ ]
1403        for sub, attrs in iface_desc:
1404            inf = topdl.Interface(
1405                    substrate=sub,
1406                    attribute=[
1407                        topdl.Attribute(
1408                            attribute=n,
1409                            value = v)
1410                        for n, v in attrs
1411                        ]
1412                    )
1413            ifaces.append(inf)
1414        return topdl.Computer(
1415                name=myname,
1416                attribute=[ 
1417                    topdl.Attribute(attribute=n,value=v)
1418                        for n, v in (\
1419                            ('portal', 'true'),
1420                            ('domain', sdomain),
1421                            ('masterdomain', mdomain),
1422                            ('masterexperiment', "%s/%s" % \
1423                                    (mproject, eid)),
1424                            ('masteruser', muser),
1425                            ('smbshare', smbshare),
1426                            ('experiment', "%s/%s" % \
1427                                    (sproject, eid)),
1428                            ('peer', "%s" % desthost),
1429                            ('peer_segment', "%s" % aid),
1430                            ('scriptdir', 
1431                                "/usr/local/federation/bin"),
1432                            ('active', "%s" % active),
1433                            ('portal_type', portal_type), 
1434                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl >& /tmp/bridge.log'))
1435                    ],
1436                interface=ifaces,
1437                )
1438
1439    def new_portal_substrate(self, st, dt, eid, tbparams):
1440        ddomain = tbparams[dt].get('domain', ".example.com")
1441        dproject = tbparams[dt].get('project', 'project')
1442        tsubstrate = \
1443                topdl.Substrate(name='%s-%s' % (st, dt),
1444                        attribute= [
1445                            topdl.Attribute(
1446                                attribute='portal',
1447                                value='true')
1448                            ]
1449                        )
1450        segment_element = topdl.Segment(
1451                id= tbparams[dt]['allocID'],
1452                type='emulab',
1453                uri = self.tbmap.get(dt, None),
1454                interface=[ 
1455                    topdl.Interface(
1456                        substrate=tsubstrate.name),
1457                    ],
1458                attribute = [
1459                    topdl.Attribute(attribute=n, value=v)
1460                        for n, v in (\
1461                            ('domain', ddomain),
1462                            ('experiment', "%s/%s" % \
1463                                    (dproject, eid)),)
1464                    ],
1465                )
1466
1467        return (tsubstrate, segment_element)
1468
1469    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
1470        if sub.capacity is None:
1471            raise service_error(service_error.internal,
1472                    "Cannot DRAGON split substrate w/o capacity")
1473        segs = [ ]
1474        substr = topdl.Substrate(name="dragon%d" % idx, 
1475                capacity=sub.capacity.clone(),
1476                attribute=[ topdl.Attribute(attribute=n, value=v)
1477                    for n, v, in (\
1478                            ('vlan', 'unassigned%d' % idx),)])
1479        for tb in tbs.keys():
1480            seg = topdl.Segment(
1481                    id = tbparams[tb]['allocID'],
1482                    type='emulab',
1483                    uri = self.tbmap.get(tb, None),
1484                    interface=[ 
1485                        topdl.Interface(
1486                            substrate=substr.name),
1487                        ],
1488                    attribute=[ topdl.Attribute(
1489                        attribute='dragon_endpoint', 
1490                        value=tbparams[tb]['dragon']),
1491                        ]
1492                    )
1493            if tbparams[tb].has_key('vlans'):
1494                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1495            segs.append(seg)
1496
1497        topo["dragon%d" %idx] = \
1498                topdl.Topology(substrates=[substr], elements=segs,
1499                        attribute=[
1500                            topdl.Attribute(attribute="transit", value='true'),
1501                            topdl.Attribute(attribute="dynamic", value='true'),
1502                            topdl.Attribute(attribute="testbed", value='dragon'),
1503                            ]
1504                        )
1505
1506    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid):
1507        """
1508        Add attribiutes to the various elements indicating that they are to be
1509        dragon connected and create a dragon segment in tops to be
1510        instantiated.
1511        """
1512
1513        def get_substrate_from_topo(name, t):
1514            for s in t.substrates:
1515                if s.name == name: return s
1516            else: return None
1517
1518        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1519        elements = [ i.element for i in sub.interfaces ]
1520        count = { }
1521        for e in elements:
1522            tb = e.get_attribute('testbed')
1523            count[tb] = count.get(tb, 0) + 1
1524
1525        for tb in tbs.keys():
1526            s = get_substrate_from_topo(sub.name, topo[tb])
1527            if s:
1528                for i in s.interfaces:
1529                    i.set_attribute('dragon_vlan', 'unassigned%d' % dn)
1530                    if count[tb] > 1: i.set_attribute('dragon_type', 'lan')
1531                    else: i.set_attribute('dragon_type', 'link')
1532            else:
1533                raise service_error(service_error.internal,
1534                        "No substrate %s in testbed %s" % (sub.name, tb))
1535
1536        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1537
1538    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1539            segment_substrate, portals):
1540        # More than one testbed is on this substrate.  Insert
1541        # some portals into the subtopologies.  st == source testbed,
1542        # dt == destination testbed.
1543        for st in tbs.keys():
1544            if not segment_substrate.has_key(st):
1545                segment_substrate[st] = { }
1546            if not portals.has_key(st): 
1547                portals[st] = { }
1548            for dt in [ t for t in tbs.keys() if t != st]:
1549                sproject = tbparams[st].get('project', 'project')
1550                dproject = tbparams[dt].get('project', 'project')
1551                mproject = tbparams[master].get('project', 'project')
1552                sdomain = tbparams[st].get('domain', ".example.com")
1553                ddomain = tbparams[dt].get('domain', ".example.com")
1554                mdomain = tbparams[master].get('domain', '.example.com')
1555                muser = tbparams[master].get('user', 'root')
1556                smbshare = tbparams[master].get('smbshare', 'USERS')
1557                aid = tbparams[dt]['allocID']['fedid']
1558                if st == master or dt == master:
1559                    active = ("%s" % (st == master))
1560                else:
1561                    active = ("%s" %(st > dt))
1562                if not segment_substrate[st].has_key(dt):
1563                    # Put a substrate and a segment for the connected
1564                    # testbed in there.
1565                    tsubstrate, segment_element = \
1566                            self.new_portal_substrate(st, dt, eid, tbparams)
1567                    segment_substrate[st][dt] = tsubstrate
1568                    topo[st].substrates.append(tsubstrate)
1569                    topo[st].elements.append(segment_element)
1570
1571                new_portal = False
1572                if portals[st].has_key(dt):
1573                    # There's a portal set up to go to this destination.
1574                    # See if there's room to multiples this connection on
1575                    # it.  If so, add an interface to the portal; if not,
1576                    # set up to add a portal below.
1577                    # [This little festival of braces is just a pop of the
1578                    # last element in the list of portals between st and
1579                    # dt.]
1580                    portal = portals[st][dt][-1]
1581                    mux = len([ i for i in portal.interface \
1582                            if not i.get_attribute('portal')])
1583                    if mux == self.muxmax:
1584                        new_portal = True
1585                        portal_type = "experiment"
1586                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1587                        desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1588                    else:
1589                        new_i = topdl.Interface(
1590                                substrate=sub.name,
1591                                attribute=[ 
1592                                    topdl.Attribute(
1593                                        attribute='ip4_address', 
1594                                        value=tbs[dt]
1595                                    )
1596                                ])
1597                        portal.interface.append(new_i)
1598                else:
1599                    # First connection to this testbed, make an empty list
1600                    # and set up to add the new portal below
1601                    new_portal = True
1602                    portals[st][dt] = [ ]
1603                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1604                    desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1605
1606                    if dt == master or st == master: portal_type = "both"
1607                    else: portal_type = "experiment"
1608
1609                if new_portal:
1610                    infs = (
1611                            (segment_substrate[st][dt].name, 
1612                                (('portal', 'true'),)),
1613                            (sub.name, 
1614                                (('ip4_address', tbs[dt]),))
1615                        )
1616                    portal =  self.new_portal_node(st, dt, tbparams, 
1617                            master, eid, myname, desthost, portal_type,
1618                            infs)
1619                    if self.fedkit:
1620                        self.add_kit(portal, self.fedkit)
1621                    if self.gatewaykit: 
1622                        self.add_kit(portal, self.gatewaykit)
1623
1624                    topo[st].elements.append(portal)
1625                    portals[st][dt].append(portal)
1626
1627    def add_control_portal(self, st, dt, master, eid, topo, tbparams):
1628        # Add to the master testbed
1629        tsubstrate, segment_element = \
1630                self.new_portal_substrate(st, dt, eid, tbparams)
1631        myname = "%stunnel" % dt
1632        desthost = "%stunnel" % st
1633
1634        portal = self.new_portal_node(st, dt, tbparams, master,
1635                eid, myname, desthost, "control", 
1636                ((tsubstrate.name,(('portal','true'),)),))
1637        if self.fedkit:
1638            self.add_kit(portal, self.fedkit)
1639        if self.gatewaykit: 
1640            self.add_kit(portal, self.gatewaykit)
1641
1642        topo[st].substrates.append(tsubstrate)
1643        topo[st].elements.append(segment_element)
1644        topo[st].elements.append(portal)
1645
1646    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1647            substrate, tbparams):
1648        # Add to the master testbed
1649        myname = "%stunnel" % dt
1650        desthost = "%s" % ip_addr(dip)
1651
1652        portal = self.new_portal_node(st, dt, tbparams, master,
1653                eid, myname, desthost, "control", 
1654                ((substrate.name,(
1655                    ('portal','true'),
1656                    ('ip4_address', "%s" % ip_addr(myip)),
1657                    ('dragon_vlan', 'unassigned%d' % idx),
1658                    ('dragon_type', 'link'),)),))
1659        if self.fedkit:
1660            self.add_kit(portal, self.fedkit)
1661        if self.gatewaykit: 
1662            self.add_kit(portal, self.gatewaykit)
1663
1664        return portal
1665
1666    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator):
1667        """
1668        For each substrate in the main topology, find those that
1669        have nodes on more than one testbed.  Insert portal nodes
1670        into the copies of those substrates on the sub topologies.
1671        """
1672        segment_substrate = { }
1673        portals = { }
1674        for s in top.substrates:
1675            # tbs will contain an ip address on this subsrate that is in
1676            # each testbed.
1677            tbs = { }
1678            for i in s.interfaces:
1679                e = i.element
1680                tb = e.get_attribute('testbed')
1681                if tb and not tbs.has_key(tb):
1682                    for i in e.interface:
1683                        if s in i.subs:
1684                            tbs[tb]= i.get_attribute('ip4_address')
1685            if len(tbs) < 2:
1686                continue
1687
1688            # DRAGON will not create multi-site vlans yet
1689            if len(tbs) == 2 and \
1690                    all([tbparams[x].has_key('dragon') for x in tbs]):
1691                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1692                        master, eid)
1693            else:
1694                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1695                        eid, segment_substrate, portals)
1696
1697        # Make sure that all the slaves have a control portal back to the
1698        # master.
1699        for tb in [ t for t in tbparams.keys() if t != master ]:
1700            if len([e for e in topo[tb].elements \
1701                    if isinstance(e, topdl.Computer) and \
1702                    e.get_attribute('portal') and \
1703                    e.get_attribute('portal_type') == 'both']) == 0:
1704
1705                if tbparams[master].has_key('dragon') \
1706                        and tbparams[tb].has_key('dragon'):
1707
1708                    idx = len([x for x in topo.keys() \
1709                            if x.startswith('dragon')])
1710                    dip, leng = ip_allocator.allocate(4)
1711                    dip += 1
1712                    mip = dip+1
1713                    csub = topdl.Substrate(
1714                            name="dragon-control-%s" % tb,
1715                            capacity=topdl.Capacity(100000.0, 'max'),
1716                            attribute=[
1717                                topdl.Attribute(
1718                                    attribute='portal',
1719                                    value='true'
1720                                    )
1721                                ]
1722                            )
1723                    seg = topdl.Segment(
1724                            id= tbparams[master]['allocID'],
1725                            type='emulab',
1726                            uri = self.tbmap.get(master, None),
1727                            interface=[ 
1728                                topdl.Interface(
1729                                    substrate=csub.name),
1730                                ],
1731                            attribute = [
1732                                topdl.Attribute(attribute=n, value=v)
1733                                    for n, v in (\
1734                                        ('domain', 
1735                                            tbparams[master].get('domain',
1736                                                ".example.com")),
1737                                        ('experiment', "%s/%s" % \
1738                                                (tbparams[master].get(
1739                                                    'project', 
1740                                                    'project'), 
1741                                                    eid)),)
1742                                ],
1743                            )
1744                    topo[tb].substrates.append(csub)
1745                    topo[tb].elements.append(
1746                            self.new_dragon_portal(tb, master, master, eid, 
1747                                dip, mip, idx, csub, tbparams))
1748                    topo[tb].elements.append(seg)
1749
1750                    mcsub = csub.clone()
1751                    seg = topdl.Segment(
1752                            id= tbparams[tb]['allocID'],
1753                            type='emulab',
1754                            uri = self.tbmap.get(tb, None),
1755                            interface=[ 
1756                                topdl.Interface(
1757                                    substrate=csub.name),
1758                                ],
1759                            attribute = [
1760                                topdl.Attribute(attribute=n, value=v)
1761                                    for n, v in (\
1762                                        ('domain', 
1763                                            tbparams[tb].get('domain',
1764                                                ".example.com")),
1765                                        ('experiment', "%s/%s" % \
1766                                                (tbparams[tb].get('project', 
1767                                                    'project'), 
1768                                                    eid)),)
1769                                ],
1770                            )
1771                    topo[master].substrates.append(mcsub)
1772                    topo[master].elements.append(
1773                            self.new_dragon_portal(master, tb, master, eid, 
1774                                mip, dip, idx, mcsub, tbparams))
1775                    topo[master].elements.append(seg)
1776
1777                    self.create_dragon_substrate(csub, topo, 
1778                            {tb: 1, master:1}, tbparams, master, eid)
1779                else:
1780                    self.add_control_portal(master, tb, master, eid, topo, 
1781                            tbparams)
1782                    self.add_control_portal(tb, master, master, eid, topo, 
1783                            tbparams)
1784
1785        # Connect the portal nodes into the topologies and clear out
1786        # substrates that are not in the topologies
1787        for tb in tbparams.keys():
1788            topo[tb].incorporate_elements()
1789            topo[tb].substrates = \
1790                    [s for s in topo[tb].substrates \
1791                        if len(s.interfaces) >0]
1792
1793    def wrangle_software(self, expid, top, topo, tbparams):
1794        """
1795        Copy software out to the repository directory, allocate permissions and
1796        rewrite the segment topologies to look for the software in local
1797        places.
1798        """
1799
1800        # Copy the rpms and tarfiles to a distribution directory from
1801        # which the federants can retrieve them
1802        linkpath = "%s/software" %  expid
1803        softdir ="%s/%s" % ( self.repodir, linkpath)
1804        softmap = { }
1805        # These are in a list of tuples format (each kit).  This comprehension
1806        # unwraps them into a single list of tuples that initilaizes the set of
1807        # tuples.
1808        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1809                for p, t in l ])
1810        pkgs.update([x.location for e in top.elements \
1811                for x in e.software])
1812        try:
1813            os.makedirs(softdir)
1814        except IOError, e:
1815            raise service_error(
1816                    "Cannot create software directory: %s" % e)
1817        # The actual copying.  Everything's converted into a url for copying.
1818        for pkg in pkgs:
1819            loc = pkg
1820
1821            scheme, host, path = urlparse(loc)[0:3]
1822            dest = os.path.basename(path)
1823            if not scheme:
1824                if not loc.startswith('/'):
1825                    loc = "/%s" % loc
1826                loc = "file://%s" %loc
1827            try:
1828                u = urlopen(loc)
1829            except Exception, e:
1830                raise service_error(service_error.req, 
1831                        "Cannot open %s: %s" % (loc, e))
1832            try:
1833                f = open("%s/%s" % (softdir, dest) , "w")
1834                self.log.debug("Writing %s/%s" % (softdir,dest) )
1835                data = u.read(4096)
1836                while data:
1837                    f.write(data)
1838                    data = u.read(4096)
1839                f.close()
1840                u.close()
1841            except Exception, e:
1842                raise service_error(service_error.internal,
1843                        "Could not copy %s: %s" % (loc, e))
1844            path = re.sub("/tmp", "", linkpath)
1845            # XXX
1846            softmap[pkg] = \
1847                    "%s/%s/%s" %\
1848                    ( self.repo_url, path, dest)
1849
1850            # Allow the individual segments to access the software.
1851            for tb in tbparams.keys():
1852                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1853                        "/%s/%s" % ( path, dest))
1854
1855        # Convert the software locations in the segments into the local
1856        # copies on this host
1857        for soft in [ s for tb in topo.values() \
1858                for e in tb.elements \
1859                    if getattr(e, 'software', False) \
1860                        for s in e.software ]:
1861            if softmap.has_key(soft.location):
1862                soft.location = softmap[soft.location]
1863
1864
1865    def new_experiment(self, req, fid):
1866        """
1867        The external interface to empty initial experiment creation called from
1868        the dispatcher.
1869
1870        Creates a working directory, splits the incoming description using the
1871        splitter script and parses out the avrious subsections using the
1872        lcasses above.  Once each sub-experiment is created, use pooled threads
1873        to instantiate them and start it all up.
1874        """
1875        if not self.auth.check_attribute(fid, 'new'):
1876            raise service_error(service_error.access, "New access denied")
1877
1878        try:
1879            tmpdir = tempfile.mkdtemp(prefix="split-")
1880        except IOError:
1881            raise service_error(service_error.internal, "Cannot create tmp dir")
1882
1883        try:
1884            access_user = self.accessdb[fid]
1885        except KeyError:
1886            raise service_error(service_error.internal,
1887                    "Access map and authorizer out of sync in " + \
1888                            "new_experiment for fedid %s"  % fid)
1889
1890        pid = "dummy"
1891        gid = "dummy"
1892
1893        req = req.get('NewRequestBody', None)
1894        if not req:
1895            raise service_error(service_error.req,
1896                    "Bad request format (no NewRequestBody)")
1897
1898        # Generate an ID for the experiment (slice) and a certificate that the
1899        # allocator can use to prove they own it.  We'll ship it back through
1900        # the encrypted connection.
1901        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1902
1903        #now we're done with the tmpdir, and it should be empty
1904        if self.cleanup:
1905            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1906            os.rmdir(tmpdir)
1907        else:
1908            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1909
1910        eid = self.create_experiment_state(fid, req, expid, expcert, 
1911                state='empty')
1912
1913        # Let users touch the state
1914        self.auth.set_attribute(fid, expid)
1915        self.auth.set_attribute(expid, expid)
1916        # Override fedids can manipulate state as well
1917        for o in self.overrides:
1918            self.auth.set_attribute(o, expid)
1919
1920        rv = {
1921                'experimentID': [
1922                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1923                ],
1924                'experimentStatus': 'empty',
1925                'experimentAccess': { 'X509' : expcert }
1926            }
1927
1928        return rv
1929
1930
1931    def create_experiment(self, req, fid):
1932        """
1933        The external interface to experiment creation called from the
1934        dispatcher.
1935
1936        Creates a working directory, splits the incoming description using the
1937        splitter script and parses out the avrious subsections using the
1938        lcasses above.  Once each sub-experiment is created, use pooled threads
1939        to instantiate them and start it all up.
1940        """
1941
1942        req = req.get('CreateRequestBody', None)
1943        if not req:
1944            raise service_error(service_error.req,
1945                    "Bad request format (no CreateRequestBody)")
1946
1947        # Get the experiment access
1948        exp = req.get('experimentID', None)
1949        if exp:
1950            if exp.has_key('fedid'):
1951                key = exp['fedid']
1952                expid = key
1953                eid = None
1954            elif exp.has_key('localname'):
1955                key = exp['localname']
1956                eid = key
1957                expid = None
1958            else:
1959                raise service_error(service_error.req, "Unknown lookup type")
1960        else:
1961            raise service_error(service_error.req, "No request?")
1962
1963        self.check_experiment_access(fid, key)
1964
1965        try:
1966            tmpdir = tempfile.mkdtemp(prefix="split-")
1967            os.mkdir(tmpdir+"/keys")
1968        except IOError:
1969            raise service_error(service_error.internal, "Cannot create tmp dir")
1970
1971        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1972        gw_secretkey_base = "fed.%s" % self.ssh_type
1973        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1974        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1975        tclfile = tmpdir + "/experiment.tcl"
1976        tbparams = { }
1977        try:
1978            access_user = self.accessdb[fid]
1979        except KeyError:
1980            raise service_error(service_error.internal,
1981                    "Access map and authorizer out of sync in " + \
1982                            "create_experiment for fedid %s"  % fid)
1983
1984        pid = "dummy"
1985        gid = "dummy"
1986
1987        # The tcl parser needs to read a file so put the content into that file
1988        descr=req.get('experimentdescription', None)
1989        if descr:
1990            file_content=descr.get('ns2description', None)
1991            if file_content:
1992                try:
1993                    f = open(tclfile, 'w')
1994                    f.write(file_content)
1995                    f.close()
1996                except IOError:
1997                    raise service_error(service_error.internal,
1998                            "Cannot write temp experiment description")
1999            else:
2000                raise service_error(service_error.req, 
2001                        "Only ns2descriptions supported")
2002        else:
2003            raise service_error(service_error.req, "No experiment description")
2004
2005        self.state_lock.acquire()
2006        if self.state.has_key(key):
2007            for e in self.state[key].get('experimentID',[]):
2008                if not expid and e.has_key('fedid'):
2009                    expid = e['fedid']
2010                elif not eid and e.has_key('localname'):
2011                    eid = e['localname']
2012        self.state_lock.release()
2013
2014        if not (eid and expid):
2015            raise service_error(service_error.internal, 
2016                    "Cannot find local experiment info!?")
2017
2018        try: 
2019            # This catches exceptions to clear the placeholder if necessary
2020            try:
2021                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2022            except ValueError:
2023                raise service_error(service_error.server_config, 
2024                        "Bad key type (%s)" % self.ssh_type)
2025
2026            user = req.get('user', None)
2027            if user == None:
2028                raise service_error(service_error.req, "No user")
2029
2030            master = req.get('master', None)
2031            if not master:
2032                raise service_error(service_error.req,
2033                        "No master testbed label")
2034            export_project = req.get('exportProject', None)
2035            if not export_project:
2036                raise service_error(service_error.req, "No export project")
2037           
2038            # Translate to topdl
2039            if self.splitter_url:
2040                # XXX: need remote topdl translator
2041                self.log.debug("Calling remote splitter at %s" % \
2042                        self.splitter_url)
2043                split_data = self.remote_splitter(self.splitter_url,
2044                        file_content, master)
2045            else:
2046                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2047                    str(self.muxmax), '-m', master]
2048
2049                if self.fedkit:
2050                    tclcmd.append('-k')
2051
2052                if self.gatewaykit:
2053                    tclcmd.append('-K')
2054
2055                tclcmd.extend([pid, gid, eid, tclfile])
2056
2057                self.log.debug("running local splitter %s", " ".join(tclcmd))
2058                # This is just fantastic.  As a side effect the parser copies
2059                # tb_compat.tcl into the current directory, so that directory
2060                # must be writable by the fedd user.  Doing this in the
2061                # temporary subdir ensures this is the case.
2062                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2063                        cwd=tmpdir)
2064                split_data = tclparser.stdout
2065
2066            top = topdl.topology_from_xml(file=split_data, top="experiment")
2067
2068            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2069             # Find the testbeds to look up
2070            testbeds = set([ a.value for e in top.elements \
2071                    for a in e.attribute \
2072                    if a.attribute == 'testbed'] )
2073
2074            allocated = { }         # Testbeds we can access
2075            topo ={ }               # Sub topologies
2076            self.get_access_to_testbeds(testbeds, user, access_user, 
2077                    export_project, master, allocated, tbparams)
2078            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2079
2080            # Copy configuration files into the remote file store
2081            # The config urlpath
2082            configpath = "/%s/config" % expid
2083            # The config file system location
2084            configdir ="%s%s" % ( self.repodir, configpath)
2085            try:
2086                os.makedirs(configdir)
2087            except IOError, e:
2088                raise service_error(
2089                        "Cannot create config directory: %s" % e)
2090            try:
2091                f = open("%s/hosts" % configdir, "w")
2092                f.write('\n'.join(hosts))
2093                f.close()
2094            except IOError, e:
2095                raise service_error(service_error.internal, 
2096                        "Cannot write hosts file: %s" % e)
2097            try:
2098                copy_file("%s" % gw_pubkey, "%s/%s" % \
2099                        (configdir, gw_pubkey_base))
2100                copy_file("%s" % gw_secretkey, "%s/%s" % \
2101                        (configdir, gw_secretkey_base))
2102            except IOError, e:
2103                raise service_error(service_error.internal, 
2104                        "Cannot copy keyfiles: %s" % e)
2105
2106            # Allow the individual testbeds to access the configuration files.
2107            for tb in tbparams.keys():
2108                asignee = tbparams[tb]['allocID']['fedid']
2109                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2110                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2111
2112            self.add_portals(top, topo, eid, master, tbparams, ip_allocator)
2113            # Now get access to the dynamic testbeds
2114            for k, t in topo.items():
2115                if not t.get_attribute('dynamic'):
2116                    continue
2117                tb = t.get_attribute('testbed')
2118                if tb: 
2119                    self.get_access(tb, None, user, tbparams, master, 
2120                            export_project, access_user)
2121                    tbparams[k] = tbparams[tb]
2122                    del tbparams[tb]
2123                    allocated[k] = 1
2124                else:
2125                    raise service_error(service_error.internal, 
2126                            "Dynamic allocation from no testbed!?")
2127
2128            self.wrangle_software(expid, top, topo, tbparams)
2129
2130            vtopo = topdl.topology_to_vtopo(top)
2131            vis = self.genviz(vtopo)
2132
2133            # save federant information
2134            for k in allocated.keys():
2135                tbparams[k]['federant'] = {
2136                        'name': [ { 'localname' : eid} ],
2137                        'allocID' : tbparams[k]['allocID'],
2138                        'master' : k == master,
2139                        'uri': tbparams[k]['uri'],
2140                    }
2141                if tbparams[k].has_key('emulab'):
2142                        tbparams[k]['federant']['emulab'] = \
2143                                tbparams[k]['emulab']
2144
2145            self.state_lock.acquire()
2146            self.state[eid]['vtopo'] = vtopo
2147            self.state[eid]['vis'] = vis
2148            self.state[expid]['federant'] = \
2149                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2150                        if tbparams[tb].has_key('federant') ]
2151            if self.state_filename: 
2152                self.write_state()
2153            self.state_lock.release()
2154        except service_error, e:
2155            # If something goes wrong in the parse (usually an access error)
2156            # clear the placeholder state.  From here on out the code delays
2157            # exceptions.  Failing at this point returns a fault to the remote
2158            # caller.
2159
2160            self.state_lock.acquire()
2161            del self.state[eid]
2162            del self.state[expid]
2163            if self.state_filename: self.write_state()
2164            self.state_lock.release()
2165            raise e
2166
2167
2168        # Start the background swapper and return the starting state.  From
2169        # here on out, the state will stick around a while.
2170
2171        # Let users touch the state
2172        self.auth.set_attribute(fid, expid)
2173        self.auth.set_attribute(expid, expid)
2174        # Override fedids can manipulate state as well
2175        for o in self.overrides:
2176            self.auth.set_attribute(o, expid)
2177
2178        # Create a logger that logs to the experiment's state object as well as
2179        # to the main log file.
2180        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2181        alloc_collector = self.list_log(self.state[eid]['log'])
2182        h = logging.StreamHandler(alloc_collector)
2183        # XXX: there should be a global one of these rather than repeating the
2184        # code.
2185        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2186                    '%d %b %y %H:%M:%S'))
2187        alloc_log.addHandler(h)
2188       
2189        attrs = [ 
2190                {
2191                    'attribute': 'ssh_pubkey', 
2192                    'value': '%s/%s/config/%s' % \
2193                            (self.repo_url, expid, gw_pubkey_base)
2194                },
2195                {
2196                    'attribute': 'ssh_secretkey', 
2197                    'value': '%s/%s/config/%s' % \
2198                            (self.repo_url, expid, gw_secretkey_base)
2199                },
2200                {
2201                    'attribute': 'hosts', 
2202                    'value': '%s/%s/config/hosts' % \
2203                            (self.repo_url, expid)
2204                },
2205                {
2206                    'attribute': 'experiment_name',
2207                    'value': eid,
2208                },
2209            ]
2210
2211        # Start a thread to do the resource allocation
2212        t  = Thread(target=self.allocate_resources,
2213                args=(allocated, master, eid, expid, tbparams, 
2214                    topo, tmpdir, alloc_log, alloc_collector, attrs),
2215                name=eid)
2216        t.start()
2217
2218        rv = {
2219                'experimentID': [
2220                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2221                ],
2222                'experimentStatus': 'starting',
2223            }
2224
2225        return rv
2226   
2227    def get_experiment_fedid(self, key):
2228        """
2229        find the fedid associated with the localname key in the state database.
2230        """
2231
2232        rv = None
2233        self.state_lock.acquire()
2234        if self.state.has_key(key):
2235            if isinstance(self.state[key], dict):
2236                try:
2237                    kl = [ f['fedid'] for f in \
2238                            self.state[key]['experimentID']\
2239                                if f.has_key('fedid') ]
2240                except KeyError:
2241                    self.state_lock.release()
2242                    raise service_error(service_error.internal, 
2243                            "No fedid for experiment %s when getting "+\
2244                                    "fedid(!?)" % key)
2245                if len(kl) == 1:
2246                    rv = kl[0]
2247                else:
2248                    self.state_lock.release()
2249                    raise service_error(service_error.internal, 
2250                            "multiple fedids for experiment %s when " +\
2251                                    "getting fedid(!?)" % key)
2252            else:
2253                self.state_lock.release()
2254                raise service_error(service_error.internal, 
2255                        "Unexpected state for %s" % key)
2256        self.state_lock.release()
2257        return rv
2258
2259    def check_experiment_access(self, fid, key):
2260        """
2261        Confirm that the fid has access to the experiment.  Though a request
2262        may be made in terms of a local name, the access attribute is always
2263        the experiment's fedid.
2264        """
2265        if not isinstance(key, fedid):
2266            key = self.get_experiment_fedid(key)
2267
2268        if self.auth.check_attribute(fid, key):
2269            return True
2270        else:
2271            raise service_error(service_error.access, "Access Denied")
2272
2273
2274    def get_handler(self, path, fid):
2275        self.log.info("Get handler %s %s" % (path, fid))
2276        if self.auth.check_attribute(fid, path):
2277            return ("%s/%s" % (self.repodir, path), "application/binary")
2278        else:
2279            return (None, None)
2280
2281    def get_vtopo(self, req, fid):
2282        """
2283        Return the stored virtual topology for this experiment
2284        """
2285        rv = None
2286        state = None
2287
2288        req = req.get('VtopoRequestBody', None)
2289        if not req:
2290            raise service_error(service_error.req,
2291                    "Bad request format (no VtopoRequestBody)")
2292        exp = req.get('experiment', None)
2293        if exp:
2294            if exp.has_key('fedid'):
2295                key = exp['fedid']
2296                keytype = "fedid"
2297            elif exp.has_key('localname'):
2298                key = exp['localname']
2299                keytype = "localname"
2300            else:
2301                raise service_error(service_error.req, "Unknown lookup type")
2302        else:
2303            raise service_error(service_error.req, "No request?")
2304
2305        self.check_experiment_access(fid, key)
2306
2307        self.state_lock.acquire()
2308        if self.state.has_key(key):
2309            if self.state[key].has_key('vtopo'):
2310                rv = { 'experiment' : {keytype: key },\
2311                        'vtopo': self.state[key]['vtopo'],\
2312                    }
2313            else:
2314                state = self.state[key]['experimentStatus']
2315        self.state_lock.release()
2316
2317        if rv: return rv
2318        else: 
2319            if state:
2320                raise service_error(service_error.partial, 
2321                        "Not ready: %s" % state)
2322            else:
2323                raise service_error(service_error.req, "No such experiment")
2324
2325    def get_vis(self, req, fid):
2326        """
2327        Return the stored visualization for this experiment
2328        """
2329        rv = None
2330        state = None
2331
2332        req = req.get('VisRequestBody', None)
2333        if not req:
2334            raise service_error(service_error.req,
2335                    "Bad request format (no VisRequestBody)")
2336        exp = req.get('experiment', None)
2337        if exp:
2338            if exp.has_key('fedid'):
2339                key = exp['fedid']
2340                keytype = "fedid"
2341            elif exp.has_key('localname'):
2342                key = exp['localname']
2343                keytype = "localname"
2344            else:
2345                raise service_error(service_error.req, "Unknown lookup type")
2346        else:
2347            raise service_error(service_error.req, "No request?")
2348
2349        self.check_experiment_access(fid, key)
2350
2351        self.state_lock.acquire()
2352        if self.state.has_key(key):
2353            if self.state[key].has_key('vis'):
2354                rv =  { 'experiment' : {keytype: key },\
2355                        'vis': self.state[key]['vis'],\
2356                        }
2357            else:
2358                state = self.state[key]['experimentStatus']
2359        self.state_lock.release()
2360
2361        if rv: return rv
2362        else:
2363            if state:
2364                raise service_error(service_error.partial, 
2365                        "Not ready: %s" % state)
2366            else:
2367                raise service_error(service_error.req, "No such experiment")
2368
2369    def clean_info_response(self, rv):
2370        """
2371        Remove the information in the experiment's state object that is not in
2372        the info response.
2373        """
2374        # Remove the owner info (should always be there, but...)
2375        if rv.has_key('owner'): del rv['owner']
2376
2377        # Convert the log into the allocationLog parameter and remove the
2378        # log entry (with defensive programming)
2379        if rv.has_key('log'):
2380            rv['allocationLog'] = "".join(rv['log'])
2381            del rv['log']
2382        else:
2383            rv['allocationLog'] = ""
2384
2385        if rv['experimentStatus'] != 'active':
2386            if rv.has_key('federant'): del rv['federant']
2387        else:
2388            # remove the allocationID and uri info from each federant
2389            for f in rv.get('federant', []):
2390                if f.has_key('allocID'): del f['allocID']
2391                if f.has_key('uri'): del f['uri']
2392        return rv
2393
2394    def get_info(self, req, fid):
2395        """
2396        Return all the stored info about this experiment
2397        """
2398        rv = None
2399
2400        req = req.get('InfoRequestBody', None)
2401        if not req:
2402            raise service_error(service_error.req,
2403                    "Bad request format (no InfoRequestBody)")
2404        exp = req.get('experiment', None)
2405        if exp:
2406            if exp.has_key('fedid'):
2407                key = exp['fedid']
2408                keytype = "fedid"
2409            elif exp.has_key('localname'):
2410                key = exp['localname']
2411                keytype = "localname"
2412            else:
2413                raise service_error(service_error.req, "Unknown lookup type")
2414        else:
2415            raise service_error(service_error.req, "No request?")
2416
2417        self.check_experiment_access(fid, key)
2418
2419        # The state may be massaged by the service function that called
2420        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2421        # state.
2422        self.state_lock.acquire()
2423        if self.state.has_key(key):
2424            rv = copy.deepcopy(self.state[key])
2425        self.state_lock.release()
2426
2427        if rv:
2428            return self.clean_info_response(rv)
2429        else:
2430            raise service_error(service_error.req, "No such experiment")
2431
2432    def get_multi_info(self, req, fid):
2433        """
2434        Return all the stored info that this fedid can access
2435        """
2436        rv = { 'info': [ ] }
2437
2438        self.state_lock.acquire()
2439        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2440            try:
2441                self.check_experiment_access(fid, key)
2442            except service_error, e:
2443                if e.code == service_error.access:
2444                    continue
2445                else:
2446                    self.state_lock.release()
2447                    raise e
2448
2449            if self.state.has_key(key):
2450                e = copy.deepcopy(self.state[key])
2451                e = self.clean_info_response(e)
2452                rv['info'].append(e)
2453        self.state_lock.release()
2454        return rv
2455
2456    def terminate_experiment(self, req, fid):
2457        """
2458        Swap this experiment out on the federants and delete the shared
2459        information
2460        """
2461        tbparams = { }
2462        req = req.get('TerminateRequestBody', None)
2463        if not req:
2464            raise service_error(service_error.req,
2465                    "Bad request format (no TerminateRequestBody)")
2466        force = req.get('force', False)
2467        exp = req.get('experiment', None)
2468        if exp:
2469            if exp.has_key('fedid'):
2470                key = exp['fedid']
2471                keytype = "fedid"
2472            elif exp.has_key('localname'):
2473                key = exp['localname']
2474                keytype = "localname"
2475            else:
2476                raise service_error(service_error.req, "Unknown lookup type")
2477        else:
2478            raise service_error(service_error.req, "No request?")
2479
2480        self.check_experiment_access(fid, key)
2481
2482        dealloc_list = [ ]
2483
2484
2485        # Create a logger that logs to the dealloc_list as well as to the main
2486        # log file.
2487        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2488        h = logging.StreamHandler(self.list_log(dealloc_list))
2489        # XXX: there should be a global one of these rather than repeating the
2490        # code.
2491        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2492                    '%d %b %y %H:%M:%S'))
2493        dealloc_log.addHandler(h)
2494
2495        self.state_lock.acquire()
2496        fed_exp = self.state.get(key, None)
2497
2498        if fed_exp:
2499            # This branch of the conditional holds the lock to generate a
2500            # consistent temporary tbparams variable to deallocate experiments.
2501            # It releases the lock to do the deallocations and reacquires it to
2502            # remove the experiment state when the termination is complete.
2503
2504            # First make sure that the experiment creation is complete.
2505            status = fed_exp.get('experimentStatus', None)
2506
2507            if status:
2508                if status in ('starting', 'terminating'):
2509                    if not force:
2510                        self.state_lock.release()
2511                        raise service_error(service_error.partial, 
2512                                'Experiment still being created or destroyed')
2513                    else:
2514                        self.log.warning('Experiment in %s state ' % status + \
2515                                'being terminated by force.')
2516            else:
2517                # No status??? trouble
2518                self.state_lock.release()
2519                raise service_error(service_error.internal,
2520                        "Experiment has no status!?")
2521
2522            ids = []
2523            #  experimentID is a list of dicts that are self-describing
2524            #  identifiers.  This finds all the fedids and localnames - the
2525            #  keys of self.state - and puts them into ids.
2526            for id in fed_exp.get('experimentID', []):
2527                if id.has_key('fedid'): ids.append(id['fedid'])
2528                if id.has_key('localname'): ids.append(id['localname'])
2529
2530            # Collect the allocation/segment ids into a dict keyed by the fedid
2531            # of the allocation (or a monotonically increasing integer) that
2532            # contains a tuple of uri, aid (which is a dict...)
2533            for i, fed in enumerate(fed_exp.get('federant', [])):
2534                try:
2535                    uri = fed['uri']
2536                    aid = fed['allocID']
2537                    k = fed['allocID'].get('fedid', i)
2538                except KeyError, e:
2539                    continue
2540                tbparams[k] = (uri, aid)
2541            fed_exp['experimentStatus'] = 'terminating'
2542            if self.state_filename: self.write_state()
2543            self.state_lock.release()
2544
2545            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2546            # then completes, so we can't wait if nothing starts.  So, no
2547            # tbparams, no start.
2548            if len(tbparams) > 0:
2549                thread_pool = self.thread_pool(self.nthreads)
2550                for k in tbparams.keys():
2551                    # Create and start a thread to stop the segment
2552                    thread_pool.wait_for_slot()
2553                    uri, aid = tbparams[k]
2554                    t  = self.pooled_thread(\
2555                            target=self.terminate_segment(log=dealloc_log,
2556                                testbed=uri,
2557                                cert_file=self.cert_file, 
2558                                cert_pwd=self.cert_pwd,
2559                                trusted_certs=self.trusted_certs,
2560                                caller=self.call_TerminateSegment),
2561                            args=(uri, aid), name=k,
2562                            pdata=thread_pool, trace_file=self.trace_file)
2563                    t.start()
2564                # Wait for completions
2565                thread_pool.wait_for_all_done()
2566
2567            # release the allocations (failed experiments have done this
2568            # already, and starting experiments may be in odd states, so we
2569            # ignore errors releasing those allocations
2570            try: 
2571                for k in tbparams.keys():
2572                    # This releases access by uri
2573                    uri, aid = tbparams[k]
2574                    self.release_access(None, aid, uri=uri)
2575            except service_error, e:
2576                if status != 'failed' and not force:
2577                    raise e
2578
2579            # Remove the terminated experiment
2580            self.state_lock.acquire()
2581            for id in ids:
2582                if self.state.has_key(id): del self.state[id]
2583
2584            if self.state_filename: self.write_state()
2585            self.state_lock.release()
2586
2587            return { 
2588                    'experiment': exp , 
2589                    'deallocationLog': "".join(dealloc_list),
2590                    }
2591        else:
2592            # Don't forget to release the lock
2593            self.state_lock.release()
2594            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.