source: fedd/federation/experiment_control.py @ a3ad8bd

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since a3ad8bd was a3ad8bd, checked in by Ted Faber <faber@…>, 14 years ago

checkpoint, adding new operation - prequel to splitting the create call to allow delegation

  • Property mode set to 100644
File size: 88.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221
222        self.exp_stem = "fed-stem"
223        self.log = logging.getLogger("fedd.experiment_control")
224        set_log_level(config, "experiment_control", self.log)
225        self.muxmax = 2
226        self.nthreads = 2
227        self.randomize_experiments = False
228
229        self.splitter = None
230        self.ssh_keygen = "/usr/bin/ssh-keygen"
231        self.ssh_identity_file = None
232
233
234        self.debug = config.getboolean("experiment_control", "create_debug")
235        self.cleanup = not config.getboolean("experiment_control", 
236                "leave_tmpfiles")
237        self.state_filename = config.get("experiment_control", 
238                "experiment_state")
239        self.splitter_url = config.get("experiment_control", "splitter_uri")
240        self.fedkit = parse_tarfile_list(\
241                config.get("experiment_control", "fedkit"))
242        self.gatewaykit = parse_tarfile_list(\
243                config.get("experiment_control", "gatewaykit"))
244        accessdb_file = config.get("experiment_control", "accessdb")
245
246        self.ssh_pubkey_file = config.get("experiment_control", 
247                "ssh_pubkey_file")
248        self.ssh_privkey_file = config.get("experiment_control",
249                "ssh_privkey_file")
250        # NB for internal master/slave ops, not experiment setup
251        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
252
253        self.overrides = set([])
254        ovr = config.get('experiment_control', 'overrides')
255        if ovr:
256            for o in ovr.split(","):
257                o = o.strip()
258                if o.startswith('fedid:'): o = o[len('fedid:'):]
259                self.overrides.add(fedid(hexstr=o))
260
261        self.state = { }
262        self.state_lock = Lock()
263        self.tclsh = "/usr/local/bin/otclsh"
264        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
265                config.get("experiment_control", "tcl_splitter",
266                        "/usr/testbed/lib/ns2ir/parse.tcl")
267        mapdb_file = config.get("experiment_control", "mapdb")
268        self.trace_file = sys.stderr
269
270        self.def_expstart = \
271                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
272                "/tmp/federate";
273        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
274                "FEDDIR/hosts";
275        self.def_gwstart = \
276                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
277                "/tmp/bridge.log";
278        self.def_mgwstart = \
279                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
280                "/tmp/bridge.log";
281        self.def_gwimage = "FBSD61-TUNNEL2";
282        self.def_gwtype = "pc";
283        self.local_access = { }
284
285        if auth:
286            self.auth = auth
287        else:
288            self.log.error(\
289                    "[access]: No authorizer initialized, creating local one.")
290            auth = authorizer()
291
292
293        if self.ssh_pubkey_file:
294            try:
295                f = open(self.ssh_pubkey_file, 'r')
296                self.ssh_pubkey = f.read()
297                f.close()
298            except IOError:
299                raise service_error(service_error.internal,
300                        "Cannot read sshpubkey")
301        else:
302            raise service_error(service_error.internal, 
303                    "No SSH public key file?")
304
305        if not self.ssh_privkey_file:
306            raise service_error(service_error.internal, 
307                    "No SSH public key file?")
308
309
310        if mapdb_file:
311            self.read_mapdb(mapdb_file)
312        else:
313            self.log.warn("[experiment_control] No testbed map, using defaults")
314            self.tbmap = { 
315                    'deter':'https://users.isi.deterlab.net:23235',
316                    'emulab':'https://users.isi.deterlab.net:23236',
317                    'ucb':'https://users.isi.deterlab.net:23237',
318                    }
319
320        if accessdb_file:
321                self.read_accessdb(accessdb_file)
322        else:
323            raise service_error(service_error.internal,
324                    "No accessdb specified in config")
325
326        # Grab saved state.  OK to do this w/o locking because it's read only
327        # and only one thread should be in existence that can see self.state at
328        # this point.
329        if self.state_filename:
330            self.read_state()
331
332        # Dispatch tables
333        self.soap_services = {\
334                'New': soap_handler('New', self.new_experiment),
335                'Create': soap_handler('Create', self.create_experiment),
336                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
337                'Vis': soap_handler('Vis', self.get_vis),
338                'Info': soap_handler('Info', self.get_info),
339                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
340                'Terminate': soap_handler('Terminate', 
341                    self.terminate_experiment),
342        }
343
344        self.xmlrpc_services = {\
345                'New': xmlrpc_handler('New', self.new_experiment),
346                'Create': xmlrpc_handler('Create', self.create_experiment),
347                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
348                'Vis': xmlrpc_handler('Vis', self.get_vis),
349                'Info': xmlrpc_handler('Info', self.get_info),
350                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
351                'Terminate': xmlrpc_handler('Terminate',
352                    self.terminate_experiment),
353        }
354
355    # Call while holding self.state_lock
356    def write_state(self):
357        """
358        Write a new copy of experiment state after copying the existing state
359        to a backup.
360
361        State format is a simple pickling of the state dictionary.
362        """
363        if os.access(self.state_filename, os.W_OK):
364            copy_file(self.state_filename, \
365                    "%s.bak" % self.state_filename)
366        try:
367            f = open(self.state_filename, 'w')
368            pickle.dump(self.state, f)
369        except IOError, e:
370            self.log.error("Can't write file %s: %s" % \
371                    (self.state_filename, e))
372        except pickle.PicklingError, e:
373            self.log.error("Pickling problem: %s" % e)
374        except TypeError, e:
375            self.log.error("Pickling problem (TypeError): %s" % e)
376
377    # Call while holding self.state_lock
378    def read_state(self):
379        """
380        Read a new copy of experiment state.  Old state is overwritten.
381
382        State format is a simple pickling of the state dictionary.
383        """
384       
385        def get_experiment_id(state):
386            """
387            Pull the fedid experimentID out of the saved state.  This is kind
388            of a gross walk through the dict.
389            """
390
391            if state.has_key('experimentID'):
392                for e in state['experimentID']:
393                    if e.has_key('fedid'):
394                        return e['fedid']
395                else:
396                    return None
397            else:
398                return None
399
400        def get_alloc_ids(state):
401            """
402            Pull the fedids of the identifiers of each allocation from the
403            state.  Again, a dict dive that's best isolated.
404            """
405
406            return [ f['allocID']['fedid'] 
407                    for f in state.get('federant',[]) \
408                        if f.has_key('allocID') and \
409                            f['allocID'].has_key('fedid')]
410
411
412        try:
413            f = open(self.state_filename, "r")
414            self.state = pickle.load(f)
415            self.log.debug("[read_state]: Read state from %s" % \
416                    self.state_filename)
417        except IOError, e:
418            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
419                    % (self.state_filename, e))
420        except pickle.UnpicklingError, e:
421            self.log.warning(("[read_state]: No saved state: " + \
422                    "Unpickling failed: %s") % e)
423       
424        for s in self.state.values():
425            try:
426
427                eid = get_experiment_id(s)
428                if eid : 
429                    # Give the owner rights to the experiment
430                    self.auth.set_attribute(s['owner'], eid)
431                    # And holders of the eid as well
432                    self.auth.set_attribute(eid, eid)
433                    # allow overrides to control experiments as well
434                    for o in self.overrides:
435                        self.auth.set_attribute(o, eid)
436                    # Set permissions to allow reading of the software repo, if
437                    # any, as well.
438                    for a in get_alloc_ids(s):
439                        self.auth.set_attribute(a, 'repo/%s' % eid)
440                else:
441                    raise KeyError("No experiment id")
442            except KeyError, e:
443                self.log.warning("[read_state]: State ownership or identity " +\
444                        "misformatted in %s: %s" % (self.state_filename, e))
445
446
447    def read_accessdb(self, accessdb_file):
448        """
449        Read the mapping from fedids that can create experiments to their name
450        in the 3-level access namespace.  All will be asserted from this
451        testbed and can include the local username and porject that will be
452        asserted on their behalf by this fedd.  Each fedid is also added to the
453        authorization system with the "create" attribute.
454        """
455        self.accessdb = {}
456        # These are the regexps for parsing the db
457        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
458        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
459                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
460        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
461                "\s*->\s*(" + name_expr + ")\s*$")
462        lineno = 0
463
464        # Parse the mappings and store in self.authdb, a dict of
465        # fedid -> (proj, user)
466        try:
467            f = open(accessdb_file, "r")
468            for line in f:
469                lineno += 1
470                line = line.strip()
471                if len(line) == 0 or line.startswith('#'):
472                    continue
473                m = project_line.match(line)
474                if m:
475                    fid = fedid(hexstr=m.group(1))
476                    project, user = m.group(2,3)
477                    if not self.accessdb.has_key(fid):
478                        self.accessdb[fid] = []
479                    self.accessdb[fid].append((project, user))
480                    continue
481
482                m = user_line.match(line)
483                if m:
484                    fid = fedid(hexstr=m.group(1))
485                    project = None
486                    user = m.group(2)
487                    if not self.accessdb.has_key(fid):
488                        self.accessdb[fid] = []
489                    self.accessdb[fid].append((project, user))
490                    continue
491                self.log.warn("[experiment_control] Error parsing access " +\
492                        "db %s at line %d" %  (accessdb_file, lineno))
493        except IOError:
494            raise service_error(service_error.internal,
495                    "Error opening/reading %s as experiment " +\
496                            "control accessdb" %  accessdb_file)
497        f.close()
498
499        # Initialize the authorization attributes
500        for fid in self.accessdb.keys():
501            self.auth.set_attribute(fid, 'create')
502            self.auth.set_attribute(fid, 'new')
503
504    def read_mapdb(self, file):
505        """
506        Read a simple colon separated list of mappings for the
507        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
508        """
509
510        self.tbmap = { }
511        lineno =0
512        try:
513            f = open(file, "r")
514            for line in f:
515                lineno += 1
516                line = line.strip()
517                if line.startswith('#') or len(line) == 0:
518                    continue
519                try:
520                    label, url = line.split(':', 1)
521                    self.tbmap[label] = url
522                except ValueError, e:
523                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
524                            "map db: %s %s" % (lineno, line, e))
525        except IOError, e:
526            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
527                    "open %s: %s" % (file, e))
528        f.close()
529       
530    def generate_ssh_keys(self, dest, type="rsa" ):
531        """
532        Generate a set of keys for the gateways to use to talk.
533
534        Keys are of type type and are stored in the required dest file.
535        """
536        valid_types = ("rsa", "dsa")
537        t = type.lower();
538        if t not in valid_types: raise ValueError
539        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
540
541        try:
542            trace = open("/dev/null", "w")
543        except IOError:
544            raise service_error(service_error.internal,
545                    "Cannot open /dev/null??");
546
547        # May raise CalledProcessError
548        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
549        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
550        if rv != 0:
551            raise service_error(service_error.internal, 
552                    "Cannot generate nonce ssh keys.  %s return code %d" \
553                            % (self.ssh_keygen, rv))
554
555    def gentopo(self, str):
556        """
557        Generate the topology dtat structure from the splitter's XML
558        representation of it.
559
560        The topology XML looks like:
561            <experiment>
562                <nodes>
563                    <node><vname></vname><ips>ip1:ip2</ips></node>
564                </nodes>
565                <lans>
566                    <lan>
567                        <vname></vname><vnode></vnode><ip></ip>
568                        <bandwidth></bandwidth><member>node:port</member>
569                    </lan>
570                </lans>
571        """
572        class topo_parse:
573            """
574            Parse the topology XML and create the dats structure.
575            """
576            def __init__(self):
577                # Typing of the subelements for data conversion
578                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
579                self.int_subelements = ( 'bandwidth',)
580                self.float_subelements = ( 'delay',)
581                # The final data structure
582                self.nodes = [ ]
583                self.lans =  [ ]
584                self.topo = { \
585                        'node': self.nodes,\
586                        'lan' : self.lans,\
587                    }
588                self.element = { }  # Current element being created
589                self.chars = ""     # Last text seen
590
591            def end_element(self, name):
592                # After each sub element the contents is added to the current
593                # element or to the appropriate list.
594                if name == 'node':
595                    self.nodes.append(self.element)
596                    self.element = { }
597                elif name == 'lan':
598                    self.lans.append(self.element)
599                    self.element = { }
600                elif name in self.str_subelements:
601                    self.element[name] = self.chars
602                    self.chars = ""
603                elif name in self.int_subelements:
604                    self.element[name] = int(self.chars)
605                    self.chars = ""
606                elif name in self.float_subelements:
607                    self.element[name] = float(self.chars)
608                    self.chars = ""
609
610            def found_chars(self, data):
611                self.chars += data.rstrip()
612
613
614        tp = topo_parse();
615        parser = xml.parsers.expat.ParserCreate()
616        parser.EndElementHandler = tp.end_element
617        parser.CharacterDataHandler = tp.found_chars
618
619        parser.Parse(str)
620
621        return tp.topo
622       
623
624    def genviz(self, topo):
625        """
626        Generate the visualization the virtual topology
627        """
628
629        neato = "/usr/local/bin/neato"
630        # These are used to parse neato output and to create the visualization
631        # file.
632        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
633        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
634                "%s</type></node>"
635
636        try:
637            # Node names
638            nodes = [ n['vname'] for n in topo['node'] ]
639            topo_lans = topo['lan']
640        except KeyError, e:
641            raise service_error(service_error.internal, "Bad topology: %s" %e)
642
643        lans = { }
644        links = { }
645
646        # Walk through the virtual topology, organizing the connections into
647        # 2-node connections (links) and more-than-2-node connections (lans).
648        # When a lan is created, it's added to the list of nodes (there's a
649        # node in the visualization for the lan).
650        for l in topo_lans:
651            if links.has_key(l['vname']):
652                if len(links[l['vname']]) < 2:
653                    links[l['vname']].append(l['vnode'])
654                else:
655                    nodes.append(l['vname'])
656                    lans[l['vname']] = links[l['vname']]
657                    del links[l['vname']]
658                    lans[l['vname']].append(l['vnode'])
659            elif lans.has_key(l['vname']):
660                lans[l['vname']].append(l['vnode'])
661            else:
662                links[l['vname']] = [ l['vnode'] ]
663
664
665        # Open up a temporary file for dot to turn into a visualization
666        try:
667            df, dotname = tempfile.mkstemp()
668            dotfile = os.fdopen(df, 'w')
669        except IOError:
670            raise service_error(service_error.internal,
671                    "Failed to open file in genviz")
672
673        try:
674            dnull = open('/dev/null', 'w')
675        except IOError:
676            service_error(service_error.internal,
677                    "Failed to open /dev/null in genviz")
678
679        # Generate a dot/neato input file from the links, nodes and lans
680        try:
681            print >>dotfile, "graph G {"
682            for n in nodes:
683                print >>dotfile, '\t"%s"' % n
684            for l in links.keys():
685                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
686            for l in lans.keys():
687                for n in lans[l]:
688                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
689            print >>dotfile, "}"
690            dotfile.close()
691        except TypeError:
692            raise service_error(service_error.internal,
693                    "Single endpoint link in vtopo")
694        except IOError:
695            raise service_error(service_error.internal, "Cannot write dot file")
696
697        # Use dot to create a visualization
698        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
699                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
700                close_fds=True)
701        dnull.close()
702
703        # Translate dot to vis format
704        vis_nodes = [ ]
705        vis = { 'node': vis_nodes }
706        for line in dot.stdout:
707            m = vis_re.match(line)
708            if m:
709                vn = m.group(1)
710                vis_node = {'name': vn, \
711                        'x': float(m.group(2)),\
712                        'y' : float(m.group(3)),\
713                    }
714                if vn in links.keys() or vn in lans.keys():
715                    vis_node['type'] = 'lan'
716                else:
717                    vis_node['type'] = 'node'
718                vis_nodes.append(vis_node)
719        rv = dot.wait()
720
721        os.remove(dotname)
722        if rv == 0 : return vis
723        else: return None
724
725    def get_access(self, tb, nodes, user, tbparam, master, export_project,
726            access_user):
727        """
728        Get access to testbed through fedd and set the parameters for that tb
729        """
730        uri = self.tbmap.get(tb, None)
731        if not uri:
732            raise service_error(serice_error.server_config, 
733                    "Unknown testbed: %s" % tb)
734
735        # currently this lumps all users into one service access group
736        service_keys = [ a for u in user \
737                for a in u.get('access', []) \
738                    if a.has_key('sshPubkey')]
739
740        if len(service_keys) == 0:
741            raise service_error(service_error.req, 
742                    "Must have at least one SSH pubkey for services")
743
744        # Tweak search order so that if there are entries in access_user that
745        # have a project matching the export project, we try them first
746        if export_project and export_project.has_key('localname'): 
747            pn = export_project['localname'] 
748               
749            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
750            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
751        else: 
752            access_sequence = access_user
753
754        for p, u in access_sequence: 
755            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
756                    "to %s") %  ((p or "None"), u, uri))
757
758            if p:
759                # Request with user and project specified
760                req = {\
761                        'destinationTestbed' : { 'uri' : uri },
762                        'project': { 
763                            'name': {'localname': p},
764                            'user': [ {'userID': { 'localname': u } } ],
765                            },
766                        'user':  user,
767                        'allocID' : { 'localname': 'test' },
768                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
769                        'serviceAccess' : service_keys
770                    }
771            else:
772                # Request with only user specified
773                req = {\
774                        'destinationTestbed' : { 'uri' : uri },
775                        'user':  [ {'userID': { 'localname': u } } ],
776                        'allocID' : { 'localname': 'test' },
777                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
778                        'serviceAccess' : service_keys
779                    }
780
781            if tb == master:
782                # NB, the export_project parameter is a dict that includes
783                # the type
784                req['exportProject'] = export_project
785
786            # node resources if any
787            if nodes != None and len(nodes) > 0:
788                rnodes = [ ]
789                for n in nodes:
790                    rn = { }
791                    image, hw, count = n.split(":")
792                    if image: rn['image'] = [ image ]
793                    if hw: rn['hardware'] = [ hw ]
794                    if count and int(count) >0 : rn['count'] = int(count)
795                    rnodes.append(rn)
796                req['resources']= { }
797                req['resources']['node'] = rnodes
798
799            try:
800                if self.local_access.has_key(uri):
801                    # Local access call
802                    req = { 'RequestAccessRequestBody' : req }
803                    r = self.local_access[uri].RequestAccess(req, 
804                            fedid(file=self.cert_file))
805                    r = { 'RequestAccessResponseBody' : r }
806                else:
807                    r = self.call_RequestAccess(uri, req, 
808                            self.cert_file, self.cert_pwd, self.trusted_certs)
809            except service_error, e:
810                if e.code == service_error.access:
811                    self.log.debug("[get_access] Access denied")
812                    r = None
813                    continue
814                else:
815                    raise e
816
817            if r.has_key('RequestAccessResponseBody'):
818                # Through to here we have a valid response, not a fault.
819                # Access denied is a fault, so something better or worse than
820                # access denied has happened.
821                r = r['RequestAccessResponseBody']
822                self.log.debug("[get_access] Access granted")
823                break
824            else:
825                raise service_error(service_error.protocol,
826                        "Bad proxy response")
827       
828        if not r:
829            raise service_error(service_error.access, 
830                    "Access denied by %s (%s)" % (tb, uri))
831
832        if r.has_key('emulab'):
833            e = r['emulab']
834            p = e['project']
835            tbparam[tb] = { 
836                    "boss": e['boss'],
837                    "host": e['ops'],
838                    "domain": e['domain'],
839                    "fs": e['fileServer'],
840                    "eventserver": e['eventServer'],
841                    "project": unpack_id(p['name']),
842                    "emulab" : e,
843                    "allocID" : r['allocID'],
844                    "uri": uri,
845                    }
846            # Make the testbed name be the label the user applied
847            p['testbed'] = {'localname': tb }
848
849            for u in p['user']:
850                role = u.get('role', None)
851                if role == 'experimentCreation':
852                    tbparam[tb]['user'] = unpack_id(u['userID'])
853                    break
854            else:
855                raise service_error(service_error.internal, 
856                        "No createExperimentUser from %s" %tb)
857            # Add attributes to parameter space.  We don't allow attributes to
858            # overlay any parameters already installed.
859            for a in e['fedAttr']:
860                try:
861                    if a['attribute'] and \
862                            isinstance(a['attribute'], basestring)\
863                            and not tbparam[tb].has_key(a['attribute'].lower()):
864                        tbparam[tb][a['attribute'].lower()] = a['value']
865                except KeyError:
866                    self.log.error("Bad attribute in response: %s" % a)
867        else:
868            tbparam[tb] = { 
869                "allocID" : r['allocID'],
870                "uri": uri,
871            }
872
873    def release_access(self, tb, aid, uri=None):
874        """
875        Release access to testbed through fedd
876        """
877
878        if not uri:
879            uri = self.tbmap.get(tb, None)
880        if not uri:
881            raise service_error(service_error.server_config, 
882                    "Unknown testbed: %s" % tb)
883
884        if self.local_access.has_key(uri):
885            resp = self.local_access[uri].ReleaseAccess(\
886                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
887                    fedid(file=self.cert_file))
888            resp = { 'ReleaseAccessResponseBody': resp } 
889        else:
890            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
891                    self.cert_file, self.cert_pwd, self.trusted_certs)
892
893        # better error coding
894
895    def remote_splitter(self, uri, desc, master):
896
897        req = {
898                'description' : { 'ns2description': desc },
899                'master': master,
900                'include_fedkit': bool(self.fedkit),
901                'include_gatewaykit': bool(self.gatewaykit)
902            }
903
904        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
905                self.trusted_certs)
906
907        if r.has_key('Ns2SplitResponseBody'):
908            r = r['Ns2SplitResponseBody']
909            if r.has_key('output'):
910                return r['output'].splitlines()
911            else:
912                raise service_error(service_error.protocol, 
913                        "Bad splitter response (no output)")
914        else:
915            raise service_error(service_error.protocol, "Bad splitter response")
916
917    class start_segment:
918        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
919                cert_pwd=None, trusted_certs=None, caller=None,
920                log_collector=None):
921            self.log = log
922            self.debug = debug
923            self.cert_file = cert_file
924            self.cert_pwd = cert_pwd
925            self.trusted_certs = None
926            self.caller = caller
927            self.testbed = testbed
928            self.log_collector = log_collector
929            self.response = None
930
931        def __call__(self, uri, aid, topo, master, attrs=None):
932            req = {
933                    'allocID': { 'fedid' : aid }, 
934                    'segmentdescription': { 
935                        'topdldescription': topo.to_dict(),
936                    },
937                    'master': master,
938                }
939            if attrs:
940                req['fedAttr'] = attrs
941
942            try:
943                self.log.debug("Calling StartSegment at %s " % uri)
944                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
945                        self.trusted_certs)
946                if r.has_key('StartSegmentResponseBody'):
947                    lval = r['StartSegmentResponseBody'].get('allocationLog',
948                            None)
949                    if lval and self.log_collector:
950                        for line in  lval.splitlines(True):
951                            self.log_collector.write(line)
952                    self.response = r
953                else:
954                    raise service_error(service_error.internal, 
955                            "Bad response!?: %s" %r)
956                return True
957            except service_error, e:
958                self.log.error("Start segment failed on %s: %s" % \
959                        (self.testbed, e))
960                return False
961
962
963
964    class terminate_segment:
965        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
966                cert_pwd=None, trusted_certs=None, caller=None):
967            self.log = log
968            self.debug = debug
969            self.cert_file = cert_file
970            self.cert_pwd = cert_pwd
971            self.trusted_certs = None
972            self.caller = caller
973            self.testbed = testbed
974
975        def __call__(self, uri, aid ):
976            req = {
977                    'allocID': aid , 
978                }
979            try:
980                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
981                        self.trusted_certs)
982                return True
983            except service_error, e:
984                self.log.error("Terminate segment failed on %s: %s" % \
985                        (self.testbed, e))
986                return False
987   
988
989    def allocate_resources(self, allocated, master, eid, expid, expcert, 
990            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
991            attrs=None):
992        def get_vlan(r):
993            if r.has_key('StartSegmentResponseBody'):
994                srb = r['StartSegmentResponseBody']
995                if srb.has_key('fedAttr'):
996                    for k, v in [ (a['attribute'], a['value']) \
997                            for a in srb['fedAttr']]:
998                        if k == 'vlan': return v
999            return None
1000
1001        started = { }           # Testbeds where a sub-experiment started
1002                                # successfully
1003
1004        # XXX
1005        fail_soft = False
1006
1007        slaves = [ k for k in allocated.keys() \
1008                if k != master and not topo[k].get_attribute('transit')]
1009        transit = [ k for k in allocated.keys() \
1010                if topo[k].get_attribute('transit')]
1011
1012        log = alloc_log or self.log
1013
1014        thread_pool = self.thread_pool(self.nthreads)
1015        threads = [ ]
1016
1017        for tb in transit:
1018            uri = tbparams[tb]['uri']
1019            if tbparams[tb].has_key('allocID') and \
1020                    tbparams[tb]['allocID'].has_key('fedid'):
1021                aid = tbparams[tb]['allocID']['fedid']
1022            else:
1023                raise service_error(service_error.internal, 
1024                        "No alloc id for testbed %s !?" % tb)
1025
1026            m = re.search('(\d+)', tb)
1027            if m:
1028                to_repl = "unassigned%s" % m.group(1)
1029            else:
1030                raise service_error(service_error.internal, 
1031                        "Bad dynamic allocation name")
1032                break
1033
1034            ss = self.start_segment(log=log, debug=self.debug, 
1035                testbed=tb, cert_file=self.cert_file, 
1036                cert_pwd=self.cert_pwd, 
1037                trusted_certs=self.trusted_certs,
1038                caller=self.call_StartSegment,
1039                log_collector=log_collector)
1040            t = self.pooled_thread(
1041                    target=ss,
1042                    args =(uri, aid, topo[tb], False, attrs),
1043                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1044            threads.append(t)
1045            t.start()
1046            # Wait until the this transit node finishes (keep pinging the log,
1047            # though)
1048
1049            mins = 0
1050            while not thread_pool.wait_for_all_done(60.0):
1051                mins += 1
1052                alloc_log.info("Waiting for transit (it has been %d mins)" \
1053                        % mins)
1054
1055            if t.rv:
1056                vlan = get_vlan(ss.response)
1057                if vlan is not None:
1058                    for k, t in topo.items():
1059                        for e in t.elements:
1060                            for i in e.interface:
1061                                vl = i.get_attribute('dragon_vlan')
1062                                if vl is not None and vl == to_repl:
1063                                    i.set_attribute('dragon_vlan', vlan)
1064            else:
1065                break
1066            thread_pool.clear()
1067
1068
1069        failed = [ t.getName() for t in threads if not t.rv ]
1070
1071        if len(failed) == 0:
1072            for tb in slaves:
1073                # Create and start a thread to start the segment, and save it
1074                # to get the return value later
1075                thread_pool.wait_for_slot()
1076                uri = self.tbmap.get(tb, None)
1077                if not uri:
1078                    raise service_error(service_error.internal, 
1079                            "Unknown testbed %s !?" % tb)
1080
1081                if tbparams[tb].has_key('allocID') and \
1082                        tbparams[tb]['allocID'].has_key('fedid'):
1083                    aid = tbparams[tb]['allocID']['fedid']
1084                else:
1085                    raise service_error(service_error.internal, 
1086                            "No alloc id for testbed %s !?" % tb)
1087
1088                t  = self.pooled_thread(\
1089                        target=self.start_segment(log=log, debug=self.debug,
1090                            testbed=tb, cert_file=self.cert_file,
1091                            cert_pwd=self.cert_pwd,
1092                            trusted_certs=self.trusted_certs,
1093                            caller=self.call_StartSegment,
1094                            log_collector=log_collector), 
1095                        args=(uri, aid, topo[tb], False, attrs), name=tb,
1096                        pdata=thread_pool, trace_file=self.trace_file)
1097                threads.append(t)
1098                t.start()
1099
1100            # Wait until all finish (keep pinging the log, though)
1101            mins = 0
1102            while not thread_pool.wait_for_all_done(60.0):
1103                mins += 1
1104                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1105                        % mins)
1106
1107            thread_pool.clear()
1108
1109        # If none failed, start the master
1110        failed = [ t.getName() for t in threads if not t.rv ]
1111
1112        if len(failed) == 0:
1113            uri = self.tbmap.get(master, None)
1114            if not uri:
1115                raise service_error(service_error.internal, 
1116                        "Unknown testbed %s !?" % master)
1117
1118            if tbparams[master].has_key('allocID') and \
1119                    tbparams[master]['allocID'].has_key('fedid'):
1120                aid = tbparams[master]['allocID']['fedid']
1121            else:
1122                raise service_error(service_error.internal, 
1123                    "No alloc id for testbed %s !?" % master)
1124            t = self.pooled_thread(
1125                    target=self.start_segment(log=log, debug=self.debug, 
1126                        testbed=master, cert_file=self.cert_file, 
1127                        cert_pwd=self.cert_pwd, 
1128                        trusted_certs=self.trusted_certs,
1129                        caller=self.call_StartSegment,
1130                        log_collector=log_collector),
1131                    args =(uri, aid, topo[master], True, attrs),
1132                    name=master, pdata=thread_pool, trace_file=self.trace_file)
1133            threads.append(t)
1134            t.start()
1135            # Wait until the master finishes (keep pinging the log, though)
1136            mins = 0
1137            while not thread_pool.wait_for_all_done(60.0):
1138                mins += 1
1139                alloc_log.info("Waiting for master (it has been %d mins)" \
1140                        % mins)
1141            # update failed to include the master, if it failed
1142            failed = [ t.getName() for t in threads if not t.rv ]
1143
1144        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1145        # If one failed clean up, unless fail_soft is set
1146        if failed:
1147            if not fail_soft:
1148                thread_pool.clear()
1149                for tb in succeeded:
1150                    # Create and start a thread to stop the segment
1151                    thread_pool.wait_for_slot()
1152                    uri = tbparams[tb]['uri']
1153                    t  = self.pooled_thread(\
1154                            target=self.terminate_segment(log=log,
1155                                testbed=tb,
1156                                cert_file=self.cert_file, 
1157                                cert_pwd=self.cert_pwd,
1158                                trusted_certs=self.trusted_certs,
1159                                caller=self.call_TerminateSegment),
1160                            args=(uri, tbparams[tb]['federant']['allocID']),
1161                            name=tb,
1162                            pdata=thread_pool, trace_file=self.trace_file)
1163                    t.start()
1164                # Wait until all finish
1165                thread_pool.wait_for_all_done()
1166
1167                # release the allocations
1168                for tb in tbparams.keys():
1169                    self.release_access(tb, tbparams[tb]['allocID'],
1170                            tbparams[tb].get('uri', None))
1171                # Remove the placeholder
1172                self.state_lock.acquire()
1173                self.state[eid]['experimentStatus'] = 'failed'
1174                if self.state_filename: self.write_state()
1175                self.state_lock.release()
1176
1177                log.error("Swap in failed on %s" % ",".join(failed))
1178                return
1179        else:
1180            log.info("[start_segment]: Experiment %s active" % eid)
1181
1182
1183        # Walk up tmpdir, deleting as we go
1184        if self.cleanup:
1185            log.debug("[start_experiment]: removing %s" % tmpdir)
1186            for path, dirs, files in os.walk(tmpdir, topdown=False):
1187                for f in files:
1188                    os.remove(os.path.join(path, f))
1189                for d in dirs:
1190                    os.rmdir(os.path.join(path, d))
1191            os.rmdir(tmpdir)
1192        else:
1193            log.debug("[start_experiment]: not removing %s" % tmpdir)
1194
1195        # Insert the experiment into our state and update the disk copy
1196        self.state_lock.acquire()
1197        self.state[expid]['experimentStatus'] = 'active'
1198        self.state[eid] = self.state[expid]
1199        if self.state_filename: self.write_state()
1200        self.state_lock.release()
1201        return
1202
1203
1204    def add_kit(self, e, kit):
1205        """
1206        Add a Software object created from the list of (install, location)
1207        tuples passed as kit  to the software attribute of an object e.  We
1208        do this enough to break out the code, but it's kind of a hack to
1209        avoid changing the old tuple rep.
1210        """
1211
1212        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1213
1214        if isinstance(e.software, list): e.software.extend(s)
1215        else: e.software = s
1216
1217
1218    def create_experiment_state(self, fid, req, expid, expcert, 
1219            state='starting'):
1220        """
1221        Create the initial entry in the experiment's state.  The expid and
1222        expcert are the experiment's fedid and certifacte that represents that
1223        ID, which are installed in the experiment state.  If the request
1224        includes a suggested local name that is used if possible.  If the local
1225        name is already taken by an experiment owned by this user that has
1226        failed, it is overwritten.  Otherwise new letters are added until a
1227        valid localname is found.  The generated local name is returned.
1228        """
1229
1230        if req.has_key('experimentID') and \
1231                req['experimentID'].has_key('localname'):
1232            overwrite = False
1233            eid = req['experimentID']['localname']
1234            # If there's an old failed experiment here with the same local name
1235            # and accessible by this user, we'll overwrite it, otherwise we'll
1236            # fall through and do the collision avoidance.
1237            old_expid = self.get_experiment_fedid(eid)
1238            if old_expid and self.check_experiment_access(fid, old_expid):
1239                self.state_lock.acquire()
1240                status = self.state[eid].get('experimentStatus', None)
1241                if status and status == 'failed':
1242                    # remove the old access attribute
1243                    self.auth.unset_attribute(fid, old_expid)
1244                    overwrite = True
1245                    del self.state[eid]
1246                    del self.state[old_expid]
1247                self.state_lock.release()
1248            self.state_lock.acquire()
1249            while (self.state.has_key(eid) and not overwrite):
1250                eid += random.choice(string.ascii_letters)
1251            # Initial state
1252            self.state[eid] = {
1253                    'experimentID' : \
1254                            [ { 'localname' : eid }, {'fedid': expid } ],
1255                    'experimentStatus': state,
1256                    'experimentAccess': { 'X509' : expcert },
1257                    'owner': fid,
1258                    'log' : [],
1259                }
1260            self.state[expid] = self.state[eid]
1261            if self.state_filename: self.write_state()
1262            self.state_lock.release()
1263        else:
1264            eid = self.exp_stem
1265            for i in range(0,5):
1266                eid += random.choice(string.ascii_letters)
1267            self.state_lock.acquire()
1268            while (self.state.has_key(eid)):
1269                eid = self.exp_stem
1270                for i in range(0,5):
1271                    eid += random.choice(string.ascii_letters)
1272            # Initial state
1273            self.state[eid] = {
1274                    'experimentID' : \
1275                            [ { 'localname' : eid }, {'fedid': expid } ],
1276                    'experimentStatus': state,
1277                    'experimentAccess': { 'X509' : expcert },
1278                    'owner': fid,
1279                    'log' : [],
1280                }
1281            self.state[expid] = self.state[eid]
1282            if self.state_filename: self.write_state()
1283            self.state_lock.release()
1284
1285        return eid
1286
1287
1288    def allocate_ips_to_topo(self, top):
1289        """
1290        Add an ip4_address attribute to all the hosts in the topology, based on
1291        the shared substrates on which they sit.  An /etc/hosts file is also
1292        created and returned as a list of hostfiles entries.  We also return
1293        the allocator, because we may need to allocate IPs to portals
1294        (specifically DRAGON portals).
1295        """
1296        subs = sorted(top.substrates, 
1297                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1298                reverse=True)
1299        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1300        ifs = { }
1301        hosts = [ ]
1302
1303        for idx, s in enumerate(subs):
1304            a = ips.allocate(len(s.interfaces)+2)
1305            if a :
1306                base, num = a
1307                if num < len(s.interfaces) +2 : 
1308                    raise service_error(service_error.internal,
1309                            "Allocator returned wrong number of IPs??")
1310            else:
1311                raise service_error(service_error.req, 
1312                        "Cannot allocate IP addresses")
1313
1314            base += 1
1315            for i in s.interfaces:
1316                i.attribute.append(
1317                        topdl.Attribute('ip4_address', 
1318                            "%s" % ip_addr(base)))
1319                hname = i.element.name[0]
1320                if ifs.has_key(hname):
1321                    hosts.append("%s\t%s-%s %s-%d" % \
1322                            (ip_addr(base), hname, s.name, hname,
1323                                ifs[hname]))
1324                else:
1325                    ifs[hname] = 0
1326                    hosts.append("%s\t%s-%s %s-%d %s" % \
1327                            (ip_addr(base), hname, s.name, hname,
1328                                ifs[hname], hname))
1329
1330                ifs[hname] += 1
1331                base += 1
1332        return hosts, ips
1333
1334    def get_access_to_testbeds(self, testbeds, user, access_user, 
1335            export_project, master, allocated, tbparams):
1336        """
1337        Request access to the various testbeds required for this instantiation
1338        (passed in as testbeds).  User, access_user, expoert_project and master
1339        are used to construct the correct requests.  Per-testbed parameters are
1340        returned in tbparams.
1341        """
1342        for tb in testbeds:
1343            self.get_access(tb, None, user, tbparams, master,
1344                    export_project, access_user)
1345            allocated[tb] = 1
1346
1347    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1348        """
1349        Create the sub-topologies that are needed for experimetn instantiation.
1350        Along the way attach startup commands to the computers in the
1351        subtopologies.
1352        """
1353        for tb in testbeds:
1354            topo[tb] = top.clone()
1355            to_delete = [ ]
1356            for e in topo[tb].elements:
1357                etb = e.get_attribute('testbed')
1358                if etb and etb != tb:
1359                    for i in e.interface:
1360                        for s in i.subs:
1361                            try:
1362                                s.interfaces.remove(i)
1363                            except ValueError:
1364                                raise service_error(service_error.internal,
1365                                        "Can't remove interface??")
1366                    to_delete.append(e)
1367            for e in to_delete:
1368                topo[tb].elements.remove(e)
1369            topo[tb].make_indices()
1370
1371            for e in [ e for e in topo[tb].elements \
1372                    if isinstance(e,topdl.Computer)]:
1373                if tb == master:
1374                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1375                else:
1376                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1377                scmd = e.get_attribute('startup')
1378                if scmd:
1379                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1380
1381                e.set_attribute('startup', cmd)
1382                if self.fedkit: self.add_kit(e, self.fedkit)
1383
1384    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1385            portal_type, iface_desc=()):
1386        sproject = tbparams[st].get('project', 'project')
1387        dproject = tbparams[dt].get('project', 'project')
1388        mproject = tbparams[master].get('project', 'project')
1389        sdomain = tbparams[st].get('domain', ".example.com")
1390        ddomain = tbparams[dt].get('domain', ".example.com")
1391        mdomain = tbparams[master].get('domain', '.example.com')
1392        muser = tbparams[master].get('user', 'root')
1393        smbshare = tbparams[master].get('smbshare', 'USERS')
1394        aid = tbparams[dt]['allocID']['fedid']
1395        if st == master or dt == master:
1396            active = ("%s" % (st == master))
1397        else:
1398            active = ("%s" %(st > dt))
1399
1400        ifaces = [ ]
1401        for sub, attrs in iface_desc:
1402            inf = topdl.Interface(
1403                    substrate=sub,
1404                    attribute=[
1405                        topdl.Attribute(
1406                            attribute=n,
1407                            value = v)
1408                        for n, v in attrs
1409                        ]
1410                    )
1411            ifaces.append(inf)
1412        return topdl.Computer(
1413                name=myname,
1414                attribute=[ 
1415                    topdl.Attribute(attribute=n,value=v)
1416                        for n, v in (\
1417                            ('portal', 'true'),
1418                            ('domain', sdomain),
1419                            ('masterdomain', mdomain),
1420                            ('masterexperiment', "%s/%s" % \
1421                                    (mproject, eid)),
1422                            ('masteruser', muser),
1423                            ('smbshare', smbshare),
1424                            ('experiment', "%s/%s" % \
1425                                    (sproject, eid)),
1426                            ('peer', "%s" % desthost),
1427                            ('peer_segment', "%s" % aid),
1428                            ('scriptdir', 
1429                                "/usr/local/federation/bin"),
1430                            ('active', "%s" % active),
1431                            ('portal_type', portal_type), 
1432                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl >& /tmp/bridge.log'))
1433                    ],
1434                interface=ifaces,
1435                )
1436
1437    def new_portal_substrate(self, st, dt, eid, tbparams):
1438        ddomain = tbparams[dt].get('domain', ".example.com")
1439        dproject = tbparams[dt].get('project', 'project')
1440        tsubstrate = \
1441                topdl.Substrate(name='%s-%s' % (st, dt),
1442                        attribute= [
1443                            topdl.Attribute(
1444                                attribute='portal',
1445                                value='true')
1446                            ]
1447                        )
1448        segment_element = topdl.Segment(
1449                id= tbparams[dt]['allocID'],
1450                type='emulab',
1451                uri = self.tbmap.get(dt, None),
1452                interface=[ 
1453                    topdl.Interface(
1454                        substrate=tsubstrate.name),
1455                    ],
1456                attribute = [
1457                    topdl.Attribute(attribute=n, value=v)
1458                        for n, v in (\
1459                            ('domain', ddomain),
1460                            ('experiment', "%s/%s" % \
1461                                    (dproject, eid)),)
1462                    ],
1463                )
1464
1465        return (tsubstrate, segment_element)
1466
1467    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
1468        if sub.capacity is None:
1469            raise service_error(service_error.internal,
1470                    "Cannot DRAGON split substrate w/o capacity")
1471        segs = [ ]
1472        substr = topdl.Substrate(name="dragon%d" % idx, 
1473                capacity=sub.capacity.clone(),
1474                attribute=[ topdl.Attribute(attribute=n, value=v)
1475                    for n, v, in (\
1476                            ('vlan', 'unassigned%d' % idx),)])
1477        for tb in tbs.keys():
1478            seg = topdl.Segment(
1479                    id = tbparams[tb]['allocID'],
1480                    type='emulab',
1481                    uri = self.tbmap.get(tb, None),
1482                    interface=[ 
1483                        topdl.Interface(
1484                            substrate=substr.name),
1485                        ],
1486                    attribute=[ topdl.Attribute(
1487                        attribute='dragon_endpoint', 
1488                        value=tbparams[tb]['dragon']),
1489                        ]
1490                    )
1491            if tbparams[tb].has_key('vlans'):
1492                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1493            segs.append(seg)
1494
1495        topo["dragon%d" %idx] = \
1496                topdl.Topology(substrates=[substr], elements=segs,
1497                        attribute=[
1498                            topdl.Attribute(attribute="transit", value='true'),
1499                            topdl.Attribute(attribute="dynamic", value='true'),
1500                            topdl.Attribute(attribute="testbed", value='dragon'),
1501                            ]
1502                        )
1503
1504    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid):
1505        """
1506        Add attribiutes to the various elements indicating that they are to be
1507        dragon connected and create a dragon segment in tops to be
1508        instantiated.
1509        """
1510
1511        def get_substrate_from_topo(name, t):
1512            for s in t.substrates:
1513                if s.name == name: return s
1514            else: return None
1515
1516        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1517        elements = [ i.element for i in sub.interfaces ]
1518        count = { }
1519        for e in elements:
1520            tb = e.get_attribute('testbed')
1521            count[tb] = count.get(tb, 0) + 1
1522
1523        for tb in tbs.keys():
1524            s = get_substrate_from_topo(sub.name, topo[tb])
1525            if s:
1526                for i in s.interfaces:
1527                    i.set_attribute('dragon_vlan', 'unassigned%d' % dn)
1528                    if count[tb] > 1: i.set_attribute('dragon_type', 'lan')
1529                    else: i.set_attribute('dragon_type', 'link')
1530            else:
1531                raise service_error(service_error.internal,
1532                        "No substrate %s in testbed %s" % (sub.name, tb))
1533
1534        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1535
1536    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1537            segment_substrate, portals):
1538        # More than one testbed is on this substrate.  Insert
1539        # some portals into the subtopologies.  st == source testbed,
1540        # dt == destination testbed.
1541        for st in tbs.keys():
1542            if not segment_substrate.has_key(st):
1543                segment_substrate[st] = { }
1544            if not portals.has_key(st): 
1545                portals[st] = { }
1546            for dt in [ t for t in tbs.keys() if t != st]:
1547                sproject = tbparams[st].get('project', 'project')
1548                dproject = tbparams[dt].get('project', 'project')
1549                mproject = tbparams[master].get('project', 'project')
1550                sdomain = tbparams[st].get('domain', ".example.com")
1551                ddomain = tbparams[dt].get('domain', ".example.com")
1552                mdomain = tbparams[master].get('domain', '.example.com')
1553                muser = tbparams[master].get('user', 'root')
1554                smbshare = tbparams[master].get('smbshare', 'USERS')
1555                aid = tbparams[dt]['allocID']['fedid']
1556                if st == master or dt == master:
1557                    active = ("%s" % (st == master))
1558                else:
1559                    active = ("%s" %(st > dt))
1560                if not segment_substrate[st].has_key(dt):
1561                    # Put a substrate and a segment for the connected
1562                    # testbed in there.
1563                    tsubstrate, segment_element = \
1564                            self.new_portal_substrate(st, dt, eid, tbparams)
1565                    segment_substrate[st][dt] = tsubstrate
1566                    topo[st].substrates.append(tsubstrate)
1567                    topo[st].elements.append(segment_element)
1568
1569                new_portal = False
1570                if portals[st].has_key(dt):
1571                    # There's a portal set up to go to this destination.
1572                    # See if there's room to multiples this connection on
1573                    # it.  If so, add an interface to the portal; if not,
1574                    # set up to add a portal below.
1575                    # [This little festival of braces is just a pop of the
1576                    # last element in the list of portals between st and
1577                    # dt.]
1578                    portal = portals[st][dt][-1]
1579                    mux = len([ i for i in portal.interface \
1580                            if not i.get_attribute('portal')])
1581                    if mux == self.muxmax:
1582                        new_portal = True
1583                        portal_type = "experiment"
1584                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1585                        desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1586                    else:
1587                        new_i = topdl.Interface(
1588                                substrate=sub.name,
1589                                attribute=[ 
1590                                    topdl.Attribute(
1591                                        attribute='ip4_address', 
1592                                        value=tbs[dt]
1593                                    )
1594                                ])
1595                        portal.interface.append(new_i)
1596                else:
1597                    # First connection to this testbed, make an empty list
1598                    # and set up to add the new portal below
1599                    new_portal = True
1600                    portals[st][dt] = [ ]
1601                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1602                    desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1603
1604                    if dt == master or st == master: portal_type = "both"
1605                    else: portal_type = "experiment"
1606
1607                if new_portal:
1608                    infs = (
1609                            (segment_substrate[st][dt].name, 
1610                                (('portal', 'true'),)),
1611                            (sub.name, 
1612                                (('ip4_address', tbs[dt]),))
1613                        )
1614                    portal =  self.new_portal_node(st, dt, tbparams, 
1615                            master, eid, myname, desthost, portal_type,
1616                            infs)
1617                    if self.fedkit:
1618                        self.add_kit(portal, self.fedkit)
1619                    if self.gatewaykit: 
1620                        self.add_kit(portal, self.gatewaykit)
1621
1622                    topo[st].elements.append(portal)
1623                    portals[st][dt].append(portal)
1624
1625    def add_control_portal(self, st, dt, master, eid, topo, tbparams):
1626        # Add to the master testbed
1627        tsubstrate, segment_element = \
1628                self.new_portal_substrate(st, dt, eid, tbparams)
1629        myname = "%stunnel" % dt
1630        desthost = "%stunnel" % st
1631
1632        portal = self.new_portal_node(st, dt, tbparams, master,
1633                eid, myname, desthost, "control", 
1634                ((tsubstrate.name,(('portal','true'),)),))
1635        if self.fedkit:
1636            self.add_kit(portal, self.fedkit)
1637        if self.gatewaykit: 
1638            self.add_kit(portal, self.gatewaykit)
1639
1640        topo[st].substrates.append(tsubstrate)
1641        topo[st].elements.append(segment_element)
1642        topo[st].elements.append(portal)
1643
1644    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1645            substrate, tbparams):
1646        # Add to the master testbed
1647        myname = "%stunnel" % dt
1648        desthost = "%s" % ip_addr(dip)
1649
1650        portal = self.new_portal_node(st, dt, tbparams, master,
1651                eid, myname, desthost, "control", 
1652                ((substrate.name,(
1653                    ('portal','true'),
1654                    ('ip4_address', "%s" % ip_addr(myip)),
1655                    ('dragon_vlan', 'unassigned%d' % idx),
1656                    ('dragon_type', 'link'),)),))
1657        if self.fedkit:
1658            self.add_kit(portal, self.fedkit)
1659        if self.gatewaykit: 
1660            self.add_kit(portal, self.gatewaykit)
1661
1662        return portal
1663
1664    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator):
1665        """
1666        For each substrate in the main topology, find those that
1667        have nodes on more than one testbed.  Insert portal nodes
1668        into the copies of those substrates on the sub topologies.
1669        """
1670        segment_substrate = { }
1671        portals = { }
1672        for s in top.substrates:
1673            # tbs will contain an ip address on this subsrate that is in
1674            # each testbed.
1675            tbs = { }
1676            for i in s.interfaces:
1677                e = i.element
1678                tb = e.get_attribute('testbed')
1679                if tb and not tbs.has_key(tb):
1680                    for i in e.interface:
1681                        if s in i.subs:
1682                            tbs[tb]= i.get_attribute('ip4_address')
1683            if len(tbs) < 2:
1684                continue
1685
1686            # DRAGON will not create multi-site vlans yet
1687            if len(tbs) == 2 and \
1688                    all([tbparams[x].has_key('dragon') for x in tbs]):
1689                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1690                        master, eid)
1691            else:
1692                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1693                        eid, segment_substrate, portals)
1694
1695        # Make sure that all the slaves have a control portal back to the
1696        # master.
1697        for tb in [ t for t in tbparams.keys() if t != master ]:
1698            if len([e for e in topo[tb].elements \
1699                    if isinstance(e, topdl.Computer) and \
1700                    e.get_attribute('portal') and \
1701                    e.get_attribute('portal_type') == 'both']) == 0:
1702
1703                if tbparams[master].has_key('dragon') \
1704                        and tbparams[tb].has_key('dragon'):
1705
1706                    idx = len([x for x in topo.keys() \
1707                            if x.startswith('dragon')])
1708                    dip, leng = ip_allocator.allocate(4)
1709                    dip += 1
1710                    mip = dip+1
1711                    csub = topdl.Substrate(
1712                            name="dragon-control-%s" % tb,
1713                            capacity=topdl.Capacity(100000.0, 'max'),
1714                            attribute=[
1715                                topdl.Attribute(
1716                                    attribute='portal',
1717                                    value='true'
1718                                    )
1719                                ]
1720                            )
1721                    seg = topdl.Segment(
1722                            id= tbparams[master]['allocID'],
1723                            type='emulab',
1724                            uri = self.tbmap.get(master, None),
1725                            interface=[ 
1726                                topdl.Interface(
1727                                    substrate=csub.name),
1728                                ],
1729                            attribute = [
1730                                topdl.Attribute(attribute=n, value=v)
1731                                    for n, v in (\
1732                                        ('domain', 
1733                                            tbparams[master].get('domain',
1734                                                ".example.com")),
1735                                        ('experiment', "%s/%s" % \
1736                                                (tbparams[master].get(
1737                                                    'project', 
1738                                                    'project'), 
1739                                                    eid)),)
1740                                ],
1741                            )
1742                    topo[tb].substrates.append(csub)
1743                    topo[tb].elements.append(
1744                            self.new_dragon_portal(tb, master, master, eid, 
1745                                dip, mip, idx, csub, tbparams))
1746                    topo[tb].elements.append(seg)
1747
1748                    mcsub = csub.clone()
1749                    seg = topdl.Segment(
1750                            id= tbparams[tb]['allocID'],
1751                            type='emulab',
1752                            uri = self.tbmap.get(tb, None),
1753                            interface=[ 
1754                                topdl.Interface(
1755                                    substrate=csub.name),
1756                                ],
1757                            attribute = [
1758                                topdl.Attribute(attribute=n, value=v)
1759                                    for n, v in (\
1760                                        ('domain', 
1761                                            tbparams[tb].get('domain',
1762                                                ".example.com")),
1763                                        ('experiment', "%s/%s" % \
1764                                                (tbparams[tb].get('project', 
1765                                                    'project'), 
1766                                                    eid)),)
1767                                ],
1768                            )
1769                    topo[master].substrates.append(mcsub)
1770                    topo[master].elements.append(
1771                            self.new_dragon_portal(master, tb, master, eid, 
1772                                mip, dip, idx, mcsub, tbparams))
1773                    topo[master].elements.append(seg)
1774
1775                    self.create_dragon_substrate(csub, topo, 
1776                            {tb: 1, master:1}, tbparams, master, eid)
1777                else:
1778                    self.add_control_portal(master, tb, master, eid, topo, 
1779                            tbparams)
1780                    self.add_control_portal(tb, master, master, eid, topo, 
1781                            tbparams)
1782
1783        # Connect the portal nodes into the topologies and clear out
1784        # substrates that are not in the topologies
1785        for tb in tbparams.keys():
1786            topo[tb].incorporate_elements()
1787            topo[tb].substrates = \
1788                    [s for s in topo[tb].substrates \
1789                        if len(s.interfaces) >0]
1790
1791    def wrangle_software(self, expid, top, topo, tbparams):
1792        """
1793        Copy software out to the repository directory, allocate permissions and
1794        rewrite the segment topologies to look for the software in local
1795        places.
1796        """
1797
1798        # Copy the rpms and tarfiles to a distribution directory from
1799        # which the federants can retrieve them
1800        linkpath = "%s/software" %  expid
1801        softdir ="%s/%s" % ( self.repodir, linkpath)
1802        softmap = { }
1803        # These are in a list of tuples format (each kit).  This comprehension
1804        # unwraps them into a single list of tuples that initilaizes the set of
1805        # tuples.
1806        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1807                for p, t in l ])
1808        pkgs.update([x.location for e in top.elements \
1809                for x in e.software])
1810        try:
1811            os.makedirs(softdir)
1812        except IOError, e:
1813            raise service_error(
1814                    "Cannot create software directory: %s" % e)
1815        # The actual copying.  Everything's converted into a url for copying.
1816        for pkg in pkgs:
1817            loc = pkg
1818
1819            scheme, host, path = urlparse(loc)[0:3]
1820            dest = os.path.basename(path)
1821            if not scheme:
1822                if not loc.startswith('/'):
1823                    loc = "/%s" % loc
1824                loc = "file://%s" %loc
1825            try:
1826                u = urlopen(loc)
1827            except Exception, e:
1828                raise service_error(service_error.req, 
1829                        "Cannot open %s: %s" % (loc, e))
1830            try:
1831                f = open("%s/%s" % (softdir, dest) , "w")
1832                self.log.debug("Writing %s/%s" % (softdir,dest) )
1833                data = u.read(4096)
1834                while data:
1835                    f.write(data)
1836                    data = u.read(4096)
1837                f.close()
1838                u.close()
1839            except Exception, e:
1840                raise service_error(service_error.internal,
1841                        "Could not copy %s: %s" % (loc, e))
1842            path = re.sub("/tmp", "", linkpath)
1843            # XXX
1844            softmap[pkg] = \
1845                    "https://users.isi.deterlab.net:23235/%s/%s" %\
1846                    ( path, dest)
1847
1848            # Allow the individual segments to access the software.
1849            for tb in tbparams.keys():
1850                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1851                        "/%s/%s" % ( path, dest))
1852
1853        # Convert the software locations in the segments into the local
1854        # copies on this host
1855        for soft in [ s for tb in topo.values() \
1856                for e in tb.elements \
1857                    if getattr(e, 'software', False) \
1858                        for s in e.software ]:
1859            if softmap.has_key(soft.location):
1860                soft.location = softmap[soft.location]
1861
1862
1863    def new_experiment(self, req, fid):
1864        """
1865        The external interface to empty initial experiment creation called from
1866        the dispatcher.
1867
1868        Creates a working directory, splits the incoming description using the
1869        splitter script and parses out the avrious subsections using the
1870        lcasses above.  Once each sub-experiment is created, use pooled threads
1871        to instantiate them and start it all up.
1872        """
1873        if not self.auth.check_attribute(fid, 'new'):
1874            raise service_error(service_error.access, "New access denied")
1875
1876        try:
1877            tmpdir = tempfile.mkdtemp(prefix="split-")
1878        except IOError:
1879            raise service_error(service_error.internal, "Cannot create tmp dir")
1880
1881        try:
1882            access_user = self.accessdb[fid]
1883        except KeyError:
1884            raise service_error(service_error.internal,
1885                    "Access map and authorizer out of sync in " + \
1886                            "create_experiment for fedid %s"  % fid)
1887
1888        pid = "dummy"
1889        gid = "dummy"
1890
1891        req = req.get('NewRequestBody', None)
1892        if not req:
1893            raise service_error(service_error.req,
1894                    "Bad request format (no NewRequestBody)")
1895
1896        # Generate an ID for the experiment (slice) and a certificate that the
1897        # allocator can use to prove they own it.  We'll ship it back through
1898        # the encrypted connection.
1899        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1900
1901        #now we're done with the tmpdir, and it should be empty
1902        if self.cleanup:
1903            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1904            os.rmdir(tmpdir)
1905        else:
1906            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1907
1908        eid = self.create_experiment_state(fid, req, expid, expcert, 
1909                state='empty')
1910
1911        # Let users touch the state
1912        self.auth.set_attribute(fid, expid)
1913        self.auth.set_attribute(expid, expid)
1914        # Override fedids can manipulate state as well
1915        for o in self.overrides:
1916            self.auth.set_attribute(o, expid)
1917
1918        rv = {
1919                'experimentID': [
1920                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1921                ],
1922                'experimentStatus': 'empty',
1923                'experimentAccess': { 'X509' : expcert }
1924            }
1925
1926        return rv
1927
1928
1929    def create_experiment(self, req, fid):
1930        """
1931        The external interface to experiment creation called from the
1932        dispatcher.
1933
1934        Creates a working directory, splits the incoming description using the
1935        splitter script and parses out the avrious subsections using the
1936        lcasses above.  Once each sub-experiment is created, use pooled threads
1937        to instantiate them and start it all up.
1938        """
1939        if not self.auth.check_attribute(fid, 'create'):
1940            raise service_error(service_error.access, "Create access denied")
1941
1942        try:
1943            tmpdir = tempfile.mkdtemp(prefix="split-")
1944            os.mkdir(tmpdir+"/keys")
1945        except IOError:
1946            raise service_error(service_error.internal, "Cannot create tmp dir")
1947
1948        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1949        gw_secretkey_base = "fed.%s" % self.ssh_type
1950        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1951        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1952        tclfile = tmpdir + "/experiment.tcl"
1953        tbparams = { }
1954        try:
1955            access_user = self.accessdb[fid]
1956        except KeyError:
1957            raise service_error(service_error.internal,
1958                    "Access map and authorizer out of sync in " + \
1959                            "create_experiment for fedid %s"  % fid)
1960
1961        pid = "dummy"
1962        gid = "dummy"
1963
1964        req = req.get('CreateRequestBody', None)
1965        if not req:
1966            raise service_error(service_error.req,
1967                    "Bad request format (no CreateRequestBody)")
1968        # The tcl parser needs to read a file so put the content into that file
1969        descr=req.get('experimentdescription', None)
1970        if descr:
1971            file_content=descr.get('ns2description', None)
1972            if file_content:
1973                try:
1974                    f = open(tclfile, 'w')
1975                    f.write(file_content)
1976                    f.close()
1977                except IOError:
1978                    raise service_error(service_error.internal,
1979                            "Cannot write temp experiment description")
1980            else:
1981                raise service_error(service_error.req, 
1982                        "Only ns2descriptions supported")
1983        else:
1984            raise service_error(service_error.req, "No experiment description")
1985
1986        # Generate an ID for the experiment (slice) and a certificate that the
1987        # allocator can use to prove they own it.  We'll ship it back through
1988        # the encrypted connection.
1989        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1990
1991        eid = self.create_experiment_state(fid, req, expid, expcert)
1992        try: 
1993            # This catches exceptions to clear the placeholder if necessary
1994            try:
1995                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1996            except ValueError:
1997                raise service_error(service_error.server_config, 
1998                        "Bad key type (%s)" % self.ssh_type)
1999
2000            user = req.get('user', None)
2001            if user == None:
2002                raise service_error(service_error.req, "No user")
2003
2004            master = req.get('master', None)
2005            if not master:
2006                raise service_error(service_error.req,
2007                        "No master testbed label")
2008            export_project = req.get('exportProject', None)
2009            if not export_project:
2010                raise service_error(service_error.req, "No export project")
2011           
2012            # Translate to topdl
2013            if self.splitter_url:
2014                # XXX: need remote topdl translator
2015                self.log.debug("Calling remote splitter at %s" % \
2016                        self.splitter_url)
2017                split_data = self.remote_splitter(self.splitter_url,
2018                        file_content, master)
2019            else:
2020                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2021                    str(self.muxmax), '-m', master]
2022
2023                if self.fedkit:
2024                    tclcmd.append('-k')
2025
2026                if self.gatewaykit:
2027                    tclcmd.append('-K')
2028
2029                tclcmd.extend([pid, gid, eid, tclfile])
2030
2031                self.log.debug("running local splitter %s", " ".join(tclcmd))
2032                # This is just fantastic.  As a side effect the parser copies
2033                # tb_compat.tcl into the current directory, so that directory
2034                # must be writable by the fedd user.  Doing this in the
2035                # temporary subdir ensures this is the case.
2036                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2037                        cwd=tmpdir)
2038                split_data = tclparser.stdout
2039
2040            top = topdl.topology_from_xml(file=split_data, top="experiment")
2041
2042            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2043             # Find the testbeds to look up
2044            testbeds = set([ a.value for e in top.elements \
2045                    for a in e.attribute \
2046                    if a.attribute == 'testbed'] )
2047
2048            allocated = { }         # Testbeds we can access
2049            topo ={ }               # Sub topologies
2050            self.get_access_to_testbeds(testbeds, user, access_user, 
2051                    export_project, master, allocated, tbparams)
2052            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2053
2054            # Copy configuration files into the remote file store
2055            # The config urlpath
2056            configpath = "/%s/config" % expid
2057            # The config file system location
2058            configdir ="%s%s" % ( self.repodir, configpath)
2059            try:
2060                os.makedirs(configdir)
2061            except IOError, e:
2062                raise service_error(
2063                        "Cannot create config directory: %s" % e)
2064            try:
2065                f = open("%s/hosts" % configdir, "w")
2066                f.write('\n'.join(hosts))
2067                f.close()
2068            except IOError, e:
2069                raise service_error(service_error.internal, 
2070                        "Cannot write hosts file: %s" % e)
2071            try:
2072                copy_file("%s" % gw_pubkey, "%s/%s" % \
2073                        (configdir, gw_pubkey_base))
2074                copy_file("%s" % gw_secretkey, "%s/%s" % \
2075                        (configdir, gw_secretkey_base))
2076            except IOError, e:
2077                raise service_error(service_error.internal, 
2078                        "Cannot copy keyfiles: %s" % e)
2079
2080            # Allow the individual testbeds to access the configuration files.
2081            for tb in tbparams.keys():
2082                asignee = tbparams[tb]['allocID']['fedid']
2083                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2084                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2085
2086            self.add_portals(top, topo, eid, master, tbparams, ip_allocator)
2087            # Now get access to the dynamic testbeds
2088            for k, t in topo.items():
2089                if not t.get_attribute('dynamic'):
2090                    continue
2091                tb = t.get_attribute('testbed')
2092                if tb: 
2093                    self.get_access(tb, None, user, tbparams, master, 
2094                            export_project, access_user)
2095                    tbparams[k] = tbparams[tb]
2096                    del tbparams[tb]
2097                    allocated[k] = 1
2098                else:
2099                    raise service_error(service_error.internal, 
2100                            "Dynamic allocation from no testbed!?")
2101
2102            self.wrangle_software(expid, top, topo, tbparams)
2103
2104            vtopo = topdl.topology_to_vtopo(top)
2105            vis = self.genviz(vtopo)
2106
2107            # save federant information
2108            for k in allocated.keys():
2109                tbparams[k]['federant'] = {
2110                        'name': [ { 'localname' : eid} ],
2111                        'allocID' : tbparams[k]['allocID'],
2112                        'master' : k == master,
2113                        'uri': tbparams[k]['uri'],
2114                    }
2115                if tbparams[k].has_key('emulab'):
2116                        tbparams[k]['federant']['emulab'] = \
2117                                tbparams[k]['emulab']
2118
2119            self.state_lock.acquire()
2120            self.state[eid]['vtopo'] = vtopo
2121            self.state[eid]['vis'] = vis
2122            self.state[expid]['federant'] = \
2123                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2124                        if tbparams[tb].has_key('federant') ]
2125            if self.state_filename: 
2126                self.write_state()
2127            self.state_lock.release()
2128        except service_error, e:
2129            # If something goes wrong in the parse (usually an access error)
2130            # clear the placeholder state.  From here on out the code delays
2131            # exceptions.  Failing at this point returns a fault to the remote
2132            # caller.
2133
2134            self.state_lock.acquire()
2135            del self.state[eid]
2136            del self.state[expid]
2137            if self.state_filename: self.write_state()
2138            self.state_lock.release()
2139            raise e
2140
2141
2142        # Start the background swapper and return the starting state.  From
2143        # here on out, the state will stick around a while.
2144
2145        # Let users touch the state
2146        self.auth.set_attribute(fid, expid)
2147        self.auth.set_attribute(expid, expid)
2148        # Override fedids can manipulate state as well
2149        for o in self.overrides:
2150            self.auth.set_attribute(o, expid)
2151
2152        # Create a logger that logs to the experiment's state object as well as
2153        # to the main log file.
2154        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2155        alloc_collector = self.list_log(self.state[eid]['log'])
2156        h = logging.StreamHandler(alloc_collector)
2157        # XXX: there should be a global one of these rather than repeating the
2158        # code.
2159        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2160                    '%d %b %y %H:%M:%S'))
2161        alloc_log.addHandler(h)
2162       
2163        # XXX
2164        url_base = 'https://users.isi.deterlab.net:23235'
2165        attrs = [ 
2166                {
2167                    'attribute': 'ssh_pubkey', 
2168                    'value': '%s/%s/config/%s' % \
2169                            (url_base, expid, gw_pubkey_base)
2170                },
2171                {
2172                    'attribute': 'ssh_secretkey', 
2173                    'value': '%s/%s/config/%s' % \
2174                            (url_base, expid, gw_secretkey_base)
2175                },
2176                {
2177                    'attribute': 'hosts', 
2178                    'value': '%s/%s/config/hosts' % \
2179                            (url_base, expid)
2180                },
2181                {
2182                    'attribute': 'experiment_name',
2183                    'value': eid,
2184                },
2185            ]
2186
2187        # Start a thread to do the resource allocation
2188        t  = Thread(target=self.allocate_resources,
2189                args=(allocated, master, eid, expid, expcert, tbparams, 
2190                    topo, tmpdir, alloc_log, alloc_collector, attrs),
2191                name=eid)
2192        t.start()
2193
2194        rv = {
2195                'experimentID': [
2196                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2197                ],
2198                'experimentStatus': 'starting',
2199                'experimentAccess': { 'X509' : expcert }
2200            }
2201
2202        return rv
2203   
2204    def get_experiment_fedid(self, key):
2205        """
2206        find the fedid associated with the localname key in the state database.
2207        """
2208
2209        rv = None
2210        self.state_lock.acquire()
2211        if self.state.has_key(key):
2212            if isinstance(self.state[key], dict):
2213                try:
2214                    kl = [ f['fedid'] for f in \
2215                            self.state[key]['experimentID']\
2216                                if f.has_key('fedid') ]
2217                except KeyError:
2218                    self.state_lock.release()
2219                    raise service_error(service_error.internal, 
2220                            "No fedid for experiment %s when getting "+\
2221                                    "fedid(!?)" % key)
2222                if len(kl) == 1:
2223                    rv = kl[0]
2224                else:
2225                    self.state_lock.release()
2226                    raise service_error(service_error.internal, 
2227                            "multiple fedids for experiment %s when " +\
2228                                    "getting fedid(!?)" % key)
2229            else:
2230                self.state_lock.release()
2231                raise service_error(service_error.internal, 
2232                        "Unexpected state for %s" % key)
2233        self.state_lock.release()
2234        return rv
2235
2236    def check_experiment_access(self, fid, key):
2237        """
2238        Confirm that the fid has access to the experiment.  Though a request
2239        may be made in terms of a local name, the access attribute is always
2240        the experiment's fedid.
2241        """
2242        if not isinstance(key, fedid):
2243            key = self.get_experiment_fedid(key)
2244
2245        if self.auth.check_attribute(fid, key):
2246            return True
2247        else:
2248            raise service_error(service_error.access, "Access Denied")
2249
2250
2251    def get_handler(self, path, fid):
2252        if self.auth.check_attribute(fid, path):
2253            return ("%s/%s" % (self.repodir, path), "application/binary")
2254        else:
2255            return (None, None)
2256
2257    def get_vtopo(self, req, fid):
2258        """
2259        Return the stored virtual topology for this experiment
2260        """
2261        rv = None
2262        state = None
2263
2264        req = req.get('VtopoRequestBody', None)
2265        if not req:
2266            raise service_error(service_error.req,
2267                    "Bad request format (no VtopoRequestBody)")
2268        exp = req.get('experiment', None)
2269        if exp:
2270            if exp.has_key('fedid'):
2271                key = exp['fedid']
2272                keytype = "fedid"
2273            elif exp.has_key('localname'):
2274                key = exp['localname']
2275                keytype = "localname"
2276            else:
2277                raise service_error(service_error.req, "Unknown lookup type")
2278        else:
2279            raise service_error(service_error.req, "No request?")
2280
2281        self.check_experiment_access(fid, key)
2282
2283        self.state_lock.acquire()
2284        if self.state.has_key(key):
2285            if self.state[key].has_key('vtopo'):
2286                rv = { 'experiment' : {keytype: key },\
2287                        'vtopo': self.state[key]['vtopo'],\
2288                    }
2289            else:
2290                state = self.state[key]['experimentStatus']
2291        self.state_lock.release()
2292
2293        if rv: return rv
2294        else: 
2295            if state:
2296                raise service_error(service_error.partial, 
2297                        "Not ready: %s" % state)
2298            else:
2299                raise service_error(service_error.req, "No such experiment")
2300
2301    def get_vis(self, req, fid):
2302        """
2303        Return the stored visualization for this experiment
2304        """
2305        rv = None
2306        state = None
2307
2308        req = req.get('VisRequestBody', None)
2309        if not req:
2310            raise service_error(service_error.req,
2311                    "Bad request format (no VisRequestBody)")
2312        exp = req.get('experiment', None)
2313        if exp:
2314            if exp.has_key('fedid'):
2315                key = exp['fedid']
2316                keytype = "fedid"
2317            elif exp.has_key('localname'):
2318                key = exp['localname']
2319                keytype = "localname"
2320            else:
2321                raise service_error(service_error.req, "Unknown lookup type")
2322        else:
2323            raise service_error(service_error.req, "No request?")
2324
2325        self.check_experiment_access(fid, key)
2326
2327        self.state_lock.acquire()
2328        if self.state.has_key(key):
2329            if self.state[key].has_key('vis'):
2330                rv =  { 'experiment' : {keytype: key },\
2331                        'vis': self.state[key]['vis'],\
2332                        }
2333            else:
2334                state = self.state[key]['experimentStatus']
2335        self.state_lock.release()
2336
2337        if rv: return rv
2338        else:
2339            if state:
2340                raise service_error(service_error.partial, 
2341                        "Not ready: %s" % state)
2342            else:
2343                raise service_error(service_error.req, "No such experiment")
2344
2345    def clean_info_response(self, rv):
2346        """
2347        Remove the information in the experiment's state object that is not in
2348        the info response.
2349        """
2350        # Remove the owner info (should always be there, but...)
2351        if rv.has_key('owner'): del rv['owner']
2352
2353        # Convert the log into the allocationLog parameter and remove the
2354        # log entry (with defensive programming)
2355        if rv.has_key('log'):
2356            rv['allocationLog'] = "".join(rv['log'])
2357            del rv['log']
2358        else:
2359            rv['allocationLog'] = ""
2360
2361        if rv['experimentStatus'] != 'active':
2362            if rv.has_key('federant'): del rv['federant']
2363        else:
2364            # remove the allocationID and uri info from each federant
2365            for f in rv.get('federant', []):
2366                if f.has_key('allocID'): del f['allocID']
2367                if f.has_key('uri'): del f['uri']
2368        return rv
2369
2370    def get_info(self, req, fid):
2371        """
2372        Return all the stored info about this experiment
2373        """
2374        rv = None
2375
2376        req = req.get('InfoRequestBody', None)
2377        if not req:
2378            raise service_error(service_error.req,
2379                    "Bad request format (no InfoRequestBody)")
2380        exp = req.get('experiment', None)
2381        if exp:
2382            if exp.has_key('fedid'):
2383                key = exp['fedid']
2384                keytype = "fedid"
2385            elif exp.has_key('localname'):
2386                key = exp['localname']
2387                keytype = "localname"
2388            else:
2389                raise service_error(service_error.req, "Unknown lookup type")
2390        else:
2391            raise service_error(service_error.req, "No request?")
2392
2393        self.check_experiment_access(fid, key)
2394
2395        # The state may be massaged by the service function that called
2396        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2397        # state.
2398        self.state_lock.acquire()
2399        if self.state.has_key(key):
2400            rv = copy.deepcopy(self.state[key])
2401        self.state_lock.release()
2402
2403        if rv:
2404            return self.clean_info_response(rv)
2405        else:
2406            raise service_error(service_error.req, "No such experiment")
2407
2408    def get_multi_info(self, req, fid):
2409        """
2410        Return all the stored info that this fedid can access
2411        """
2412        rv = { 'info': [ ] }
2413
2414        self.state_lock.acquire()
2415        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2416            try:
2417                self.check_experiment_access(fid, key)
2418            except service_error, e:
2419                if e.code == service_error.access:
2420                    continue
2421                else:
2422                    self.state_lock.release()
2423                    raise e
2424
2425            if self.state.has_key(key):
2426                e = copy.deepcopy(self.state[key])
2427                e = self.clean_info_response(e)
2428                rv['info'].append(e)
2429        self.state_lock.release()
2430        return rv
2431
2432    def terminate_experiment(self, req, fid):
2433        """
2434        Swap this experiment out on the federants and delete the shared
2435        information
2436        """
2437        tbparams = { }
2438        req = req.get('TerminateRequestBody', None)
2439        if not req:
2440            raise service_error(service_error.req,
2441                    "Bad request format (no TerminateRequestBody)")
2442        force = req.get('force', False)
2443        exp = req.get('experiment', None)
2444        if exp:
2445            if exp.has_key('fedid'):
2446                key = exp['fedid']
2447                keytype = "fedid"
2448            elif exp.has_key('localname'):
2449                key = exp['localname']
2450                keytype = "localname"
2451            else:
2452                raise service_error(service_error.req, "Unknown lookup type")
2453        else:
2454            raise service_error(service_error.req, "No request?")
2455
2456        self.check_experiment_access(fid, key)
2457
2458        dealloc_list = [ ]
2459
2460
2461        # Create a logger that logs to the dealloc_list as well as to the main
2462        # log file.
2463        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2464        h = logging.StreamHandler(self.list_log(dealloc_list))
2465        # XXX: there should be a global one of these rather than repeating the
2466        # code.
2467        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2468                    '%d %b %y %H:%M:%S'))
2469        dealloc_log.addHandler(h)
2470
2471        self.state_lock.acquire()
2472        fed_exp = self.state.get(key, None)
2473
2474        if fed_exp:
2475            # This branch of the conditional holds the lock to generate a
2476            # consistent temporary tbparams variable to deallocate experiments.
2477            # It releases the lock to do the deallocations and reacquires it to
2478            # remove the experiment state when the termination is complete.
2479
2480            # First make sure that the experiment creation is complete.
2481            status = fed_exp.get('experimentStatus', None)
2482
2483            if status:
2484                if status in ('starting', 'terminating'):
2485                    if not force:
2486                        self.state_lock.release()
2487                        raise service_error(service_error.partial, 
2488                                'Experiment still being created or destroyed')
2489                    else:
2490                        self.log.warning('Experiment in %s state ' % status + \
2491                                'being terminated by force.')
2492            else:
2493                # No status??? trouble
2494                self.state_lock.release()
2495                raise service_error(service_error.internal,
2496                        "Experiment has no status!?")
2497
2498            ids = []
2499            #  experimentID is a list of dicts that are self-describing
2500            #  identifiers.  This finds all the fedids and localnames - the
2501            #  keys of self.state - and puts them into ids.
2502            for id in fed_exp.get('experimentID', []):
2503                if id.has_key('fedid'): ids.append(id['fedid'])
2504                if id.has_key('localname'): ids.append(id['localname'])
2505
2506            # Collect the allocation/segment ids into a dict keyed by the fedid
2507            # of the allocation (or a monotonically increasing integer) that
2508            # contains a tuple of uri, aid (which is a dict...)
2509            for i, fed in enumerate(fed_exp.get('federant', [])):
2510                try:
2511                    uri = fed['uri']
2512                    aid = fed['allocID']
2513                    k = fed['allocID'].get('fedid', i)
2514                except KeyError, e:
2515                    continue
2516                tbparams[k] = (uri, aid)
2517            fed_exp['experimentStatus'] = 'terminating'
2518            if self.state_filename: self.write_state()
2519            self.state_lock.release()
2520
2521            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2522            # then completes, so we can't wait if nothing starts.  So, no
2523            # tbparams, no start.
2524            if len(tbparams) > 0:
2525                thread_pool = self.thread_pool(self.nthreads)
2526                for k in tbparams.keys():
2527                    # Create and start a thread to stop the segment
2528                    thread_pool.wait_for_slot()
2529                    uri, aid = tbparams[k]
2530                    t  = self.pooled_thread(\
2531                            target=self.terminate_segment(log=dealloc_log,
2532                                testbed=uri,
2533                                cert_file=self.cert_file, 
2534                                cert_pwd=self.cert_pwd,
2535                                trusted_certs=self.trusted_certs,
2536                                caller=self.call_TerminateSegment),
2537                            args=(uri, aid), name=k,
2538                            pdata=thread_pool, trace_file=self.trace_file)
2539                    t.start()
2540                # Wait for completions
2541                thread_pool.wait_for_all_done()
2542
2543            # release the allocations (failed experiments have done this
2544            # already, and starting experiments may be in odd states, so we
2545            # ignore errors releasing those allocations
2546            try: 
2547                for k in tbparams.keys():
2548                    # This releases access by uri
2549                    uri, aid = tbparams[k]
2550                    self.release_access(None, aid, uri=uri)
2551            except service_error, e:
2552                if status != 'failed' and not force:
2553                    raise e
2554
2555            # Remove the terminated experiment
2556            self.state_lock.acquire()
2557            for id in ids:
2558                if self.state.has_key(id): del self.state[id]
2559
2560            if self.state_filename: self.write_state()
2561            self.state_lock.release()
2562
2563            return { 
2564                    'experiment': exp , 
2565                    'deallocationLog': "".join(dealloc_list),
2566                    }
2567        else:
2568            # Don't forget to release the lock
2569            self.state_lock.release()
2570            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.