source: fedd/federation/experiment_control.py @ c0a8738

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since c0a8738 was d87778f, checked in by Ted Faber <faber@…>, 15 years ago

Some bugs that weren't shaken out. Wrong peer in the config file, active configs not marked, and startcmds not munged.

  • Property mode set to 100644
File size: 88.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221        self.repo_url = config.get("experiment_control", "repo_url", 
222                "https://users.isi.deterlab.net:23235");
223
224        self.exp_stem = "fed-stem"
225        self.log = logging.getLogger("fedd.experiment_control")
226        set_log_level(config, "experiment_control", self.log)
227        self.muxmax = 2
228        self.nthreads = 2
229        self.randomize_experiments = False
230
231        self.splitter = None
232        self.ssh_keygen = "/usr/bin/ssh-keygen"
233        self.ssh_identity_file = None
234
235
236        self.debug = config.getboolean("experiment_control", "create_debug")
237        self.cleanup = not config.getboolean("experiment_control", 
238                "leave_tmpfiles")
239        self.state_filename = config.get("experiment_control", 
240                "experiment_state")
241        self.splitter_url = config.get("experiment_control", "splitter_uri")
242        self.fedkit = parse_tarfile_list(\
243                config.get("experiment_control", "fedkit"))
244        self.gatewaykit = parse_tarfile_list(\
245                config.get("experiment_control", "gatewaykit"))
246        accessdb_file = config.get("experiment_control", "accessdb")
247
248        self.ssh_pubkey_file = config.get("experiment_control", 
249                "ssh_pubkey_file")
250        self.ssh_privkey_file = config.get("experiment_control",
251                "ssh_privkey_file")
252        # NB for internal master/slave ops, not experiment setup
253        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
254
255        self.overrides = set([])
256        ovr = config.get('experiment_control', 'overrides')
257        if ovr:
258            for o in ovr.split(","):
259                o = o.strip()
260                if o.startswith('fedid:'): o = o[len('fedid:'):]
261                self.overrides.add(fedid(hexstr=o))
262
263        self.state = { }
264        self.state_lock = Lock()
265        self.tclsh = "/usr/local/bin/otclsh"
266        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
267                config.get("experiment_control", "tcl_splitter",
268                        "/usr/testbed/lib/ns2ir/parse.tcl")
269        mapdb_file = config.get("experiment_control", "mapdb")
270        self.trace_file = sys.stderr
271
272        self.def_expstart = \
273                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
274                "/tmp/federate";
275        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
276                "FEDDIR/hosts";
277        self.def_gwstart = \
278                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
279                "/tmp/bridge.log";
280        self.def_mgwstart = \
281                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
282                "/tmp/bridge.log";
283        self.def_gwimage = "FBSD61-TUNNEL2";
284        self.def_gwtype = "pc";
285        self.local_access = { }
286
287        if auth:
288            self.auth = auth
289        else:
290            self.log.error(\
291                    "[access]: No authorizer initialized, creating local one.")
292            auth = authorizer()
293
294
295        if self.ssh_pubkey_file:
296            try:
297                f = open(self.ssh_pubkey_file, 'r')
298                self.ssh_pubkey = f.read()
299                f.close()
300            except IOError:
301                raise service_error(service_error.internal,
302                        "Cannot read sshpubkey")
303        else:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307        if not self.ssh_privkey_file:
308            raise service_error(service_error.internal, 
309                    "No SSH public key file?")
310
311
312        if mapdb_file:
313            self.read_mapdb(mapdb_file)
314        else:
315            self.log.warn("[experiment_control] No testbed map, using defaults")
316            self.tbmap = { 
317                    'deter':'https://users.isi.deterlab.net:23235',
318                    'emulab':'https://users.isi.deterlab.net:23236',
319                    'ucb':'https://users.isi.deterlab.net:23237',
320                    }
321
322        if accessdb_file:
323                self.read_accessdb(accessdb_file)
324        else:
325            raise service_error(service_error.internal,
326                    "No accessdb specified in config")
327
328        # Grab saved state.  OK to do this w/o locking because it's read only
329        # and only one thread should be in existence that can see self.state at
330        # this point.
331        if self.state_filename:
332            self.read_state()
333
334        # Dispatch tables
335        self.soap_services = {\
336                'New': soap_handler('New', self.new_experiment),
337                'Create': soap_handler('Create', self.create_experiment),
338                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
339                'Vis': soap_handler('Vis', self.get_vis),
340                'Info': soap_handler('Info', self.get_info),
341                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
342                'Terminate': soap_handler('Terminate', 
343                    self.terminate_experiment),
344        }
345
346        self.xmlrpc_services = {\
347                'New': xmlrpc_handler('New', self.new_experiment),
348                'Create': xmlrpc_handler('Create', self.create_experiment),
349                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
350                'Vis': xmlrpc_handler('Vis', self.get_vis),
351                'Info': xmlrpc_handler('Info', self.get_info),
352                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
353                'Terminate': xmlrpc_handler('Terminate',
354                    self.terminate_experiment),
355        }
356
357    # Call while holding self.state_lock
358    def write_state(self):
359        """
360        Write a new copy of experiment state after copying the existing state
361        to a backup.
362
363        State format is a simple pickling of the state dictionary.
364        """
365        if os.access(self.state_filename, os.W_OK):
366            copy_file(self.state_filename, \
367                    "%s.bak" % self.state_filename)
368        try:
369            f = open(self.state_filename, 'w')
370            pickle.dump(self.state, f)
371        except IOError, e:
372            self.log.error("Can't write file %s: %s" % \
373                    (self.state_filename, e))
374        except pickle.PicklingError, e:
375            self.log.error("Pickling problem: %s" % e)
376        except TypeError, e:
377            self.log.error("Pickling problem (TypeError): %s" % e)
378
379    # Call while holding self.state_lock
380    def read_state(self):
381        """
382        Read a new copy of experiment state.  Old state is overwritten.
383
384        State format is a simple pickling of the state dictionary.
385        """
386       
387        def get_experiment_id(state):
388            """
389            Pull the fedid experimentID out of the saved state.  This is kind
390            of a gross walk through the dict.
391            """
392
393            if state.has_key('experimentID'):
394                for e in state['experimentID']:
395                    if e.has_key('fedid'):
396                        return e['fedid']
397                else:
398                    return None
399            else:
400                return None
401
402        def get_alloc_ids(state):
403            """
404            Pull the fedids of the identifiers of each allocation from the
405            state.  Again, a dict dive that's best isolated.
406            """
407
408            return [ f['allocID']['fedid'] 
409                    for f in state.get('federant',[]) \
410                        if f.has_key('allocID') and \
411                            f['allocID'].has_key('fedid')]
412
413
414        try:
415            f = open(self.state_filename, "r")
416            self.state = pickle.load(f)
417            self.log.debug("[read_state]: Read state from %s" % \
418                    self.state_filename)
419        except IOError, e:
420            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
421                    % (self.state_filename, e))
422        except pickle.UnpicklingError, e:
423            self.log.warning(("[read_state]: No saved state: " + \
424                    "Unpickling failed: %s") % e)
425       
426        for s in self.state.values():
427            try:
428
429                eid = get_experiment_id(s)
430                if eid : 
431                    # Give the owner rights to the experiment
432                    self.auth.set_attribute(s['owner'], eid)
433                    # And holders of the eid as well
434                    self.auth.set_attribute(eid, eid)
435                    # allow overrides to control experiments as well
436                    for o in self.overrides:
437                        self.auth.set_attribute(o, eid)
438                    # Set permissions to allow reading of the software repo, if
439                    # any, as well.
440                    for a in get_alloc_ids(s):
441                        self.auth.set_attribute(a, 'repo/%s' % eid)
442                else:
443                    raise KeyError("No experiment id")
444            except KeyError, e:
445                self.log.warning("[read_state]: State ownership or identity " +\
446                        "misformatted in %s: %s" % (self.state_filename, e))
447
448
449    def read_accessdb(self, accessdb_file):
450        """
451        Read the mapping from fedids that can create experiments to their name
452        in the 3-level access namespace.  All will be asserted from this
453        testbed and can include the local username and porject that will be
454        asserted on their behalf by this fedd.  Each fedid is also added to the
455        authorization system with the "create" attribute.
456        """
457        self.accessdb = {}
458        # These are the regexps for parsing the db
459        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
460        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
461                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
462        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
463                "\s*->\s*(" + name_expr + ")\s*$")
464        lineno = 0
465
466        # Parse the mappings and store in self.authdb, a dict of
467        # fedid -> (proj, user)
468        try:
469            f = open(accessdb_file, "r")
470            for line in f:
471                lineno += 1
472                line = line.strip()
473                if len(line) == 0 or line.startswith('#'):
474                    continue
475                m = project_line.match(line)
476                if m:
477                    fid = fedid(hexstr=m.group(1))
478                    project, user = m.group(2,3)
479                    if not self.accessdb.has_key(fid):
480                        self.accessdb[fid] = []
481                    self.accessdb[fid].append((project, user))
482                    continue
483
484                m = user_line.match(line)
485                if m:
486                    fid = fedid(hexstr=m.group(1))
487                    project = None
488                    user = m.group(2)
489                    if not self.accessdb.has_key(fid):
490                        self.accessdb[fid] = []
491                    self.accessdb[fid].append((project, user))
492                    continue
493                self.log.warn("[experiment_control] Error parsing access " +\
494                        "db %s at line %d" %  (accessdb_file, lineno))
495        except IOError:
496            raise service_error(service_error.internal,
497                    "Error opening/reading %s as experiment " +\
498                            "control accessdb" %  accessdb_file)
499        f.close()
500
501        # Initialize the authorization attributes
502        for fid in self.accessdb.keys():
503            self.auth.set_attribute(fid, 'create')
504            self.auth.set_attribute(fid, 'new')
505
506    def read_mapdb(self, file):
507        """
508        Read a simple colon separated list of mappings for the
509        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
510        """
511
512        self.tbmap = { }
513        lineno =0
514        try:
515            f = open(file, "r")
516            for line in f:
517                lineno += 1
518                line = line.strip()
519                if line.startswith('#') or len(line) == 0:
520                    continue
521                try:
522                    label, url = line.split(':', 1)
523                    self.tbmap[label] = url
524                except ValueError, e:
525                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
526                            "map db: %s %s" % (lineno, line, e))
527        except IOError, e:
528            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
529                    "open %s: %s" % (file, e))
530        f.close()
531       
532    def generate_ssh_keys(self, dest, type="rsa" ):
533        """
534        Generate a set of keys for the gateways to use to talk.
535
536        Keys are of type type and are stored in the required dest file.
537        """
538        valid_types = ("rsa", "dsa")
539        t = type.lower();
540        if t not in valid_types: raise ValueError
541        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
542
543        try:
544            trace = open("/dev/null", "w")
545        except IOError:
546            raise service_error(service_error.internal,
547                    "Cannot open /dev/null??");
548
549        # May raise CalledProcessError
550        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
551        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
552        if rv != 0:
553            raise service_error(service_error.internal, 
554                    "Cannot generate nonce ssh keys.  %s return code %d" \
555                            % (self.ssh_keygen, rv))
556
557    def gentopo(self, str):
558        """
559        Generate the topology dtat structure from the splitter's XML
560        representation of it.
561
562        The topology XML looks like:
563            <experiment>
564                <nodes>
565                    <node><vname></vname><ips>ip1:ip2</ips></node>
566                </nodes>
567                <lans>
568                    <lan>
569                        <vname></vname><vnode></vnode><ip></ip>
570                        <bandwidth></bandwidth><member>node:port</member>
571                    </lan>
572                </lans>
573        """
574        class topo_parse:
575            """
576            Parse the topology XML and create the dats structure.
577            """
578            def __init__(self):
579                # Typing of the subelements for data conversion
580                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
581                self.int_subelements = ( 'bandwidth',)
582                self.float_subelements = ( 'delay',)
583                # The final data structure
584                self.nodes = [ ]
585                self.lans =  [ ]
586                self.topo = { \
587                        'node': self.nodes,\
588                        'lan' : self.lans,\
589                    }
590                self.element = { }  # Current element being created
591                self.chars = ""     # Last text seen
592
593            def end_element(self, name):
594                # After each sub element the contents is added to the current
595                # element or to the appropriate list.
596                if name == 'node':
597                    self.nodes.append(self.element)
598                    self.element = { }
599                elif name == 'lan':
600                    self.lans.append(self.element)
601                    self.element = { }
602                elif name in self.str_subelements:
603                    self.element[name] = self.chars
604                    self.chars = ""
605                elif name in self.int_subelements:
606                    self.element[name] = int(self.chars)
607                    self.chars = ""
608                elif name in self.float_subelements:
609                    self.element[name] = float(self.chars)
610                    self.chars = ""
611
612            def found_chars(self, data):
613                self.chars += data.rstrip()
614
615
616        tp = topo_parse();
617        parser = xml.parsers.expat.ParserCreate()
618        parser.EndElementHandler = tp.end_element
619        parser.CharacterDataHandler = tp.found_chars
620
621        parser.Parse(str)
622
623        return tp.topo
624       
625
626    def genviz(self, topo):
627        """
628        Generate the visualization the virtual topology
629        """
630
631        neato = "/usr/local/bin/neato"
632        # These are used to parse neato output and to create the visualization
633        # file.
634        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
635        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
636                "%s</type></node>"
637
638        try:
639            # Node names
640            nodes = [ n['vname'] for n in topo['node'] ]
641            topo_lans = topo['lan']
642        except KeyError, e:
643            raise service_error(service_error.internal, "Bad topology: %s" %e)
644
645        lans = { }
646        links = { }
647
648        # Walk through the virtual topology, organizing the connections into
649        # 2-node connections (links) and more-than-2-node connections (lans).
650        # When a lan is created, it's added to the list of nodes (there's a
651        # node in the visualization for the lan).
652        for l in topo_lans:
653            if links.has_key(l['vname']):
654                if len(links[l['vname']]) < 2:
655                    links[l['vname']].append(l['vnode'])
656                else:
657                    nodes.append(l['vname'])
658                    lans[l['vname']] = links[l['vname']]
659                    del links[l['vname']]
660                    lans[l['vname']].append(l['vnode'])
661            elif lans.has_key(l['vname']):
662                lans[l['vname']].append(l['vnode'])
663            else:
664                links[l['vname']] = [ l['vnode'] ]
665
666
667        # Open up a temporary file for dot to turn into a visualization
668        try:
669            df, dotname = tempfile.mkstemp()
670            dotfile = os.fdopen(df, 'w')
671        except IOError:
672            raise service_error(service_error.internal,
673                    "Failed to open file in genviz")
674
675        try:
676            dnull = open('/dev/null', 'w')
677        except IOError:
678            service_error(service_error.internal,
679                    "Failed to open /dev/null in genviz")
680
681        # Generate a dot/neato input file from the links, nodes and lans
682        try:
683            print >>dotfile, "graph G {"
684            for n in nodes:
685                print >>dotfile, '\t"%s"' % n
686            for l in links.keys():
687                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
688            for l in lans.keys():
689                for n in lans[l]:
690                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
691            print >>dotfile, "}"
692            dotfile.close()
693        except TypeError:
694            raise service_error(service_error.internal,
695                    "Single endpoint link in vtopo")
696        except IOError:
697            raise service_error(service_error.internal, "Cannot write dot file")
698
699        # Use dot to create a visualization
700        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
701                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
702                close_fds=True)
703        dnull.close()
704
705        # Translate dot to vis format
706        vis_nodes = [ ]
707        vis = { 'node': vis_nodes }
708        for line in dot.stdout:
709            m = vis_re.match(line)
710            if m:
711                vn = m.group(1)
712                vis_node = {'name': vn, \
713                        'x': float(m.group(2)),\
714                        'y' : float(m.group(3)),\
715                    }
716                if vn in links.keys() or vn in lans.keys():
717                    vis_node['type'] = 'lan'
718                else:
719                    vis_node['type'] = 'node'
720                vis_nodes.append(vis_node)
721        rv = dot.wait()
722
723        os.remove(dotname)
724        if rv == 0 : return vis
725        else: return None
726
727    def get_access(self, tb, nodes, tbparam, master, export_project,
728            access_user, services):
729        """
730        Get access to testbed through fedd and set the parameters for that tb
731        """
732        uri = self.tbmap.get(tb, None)
733        if not uri:
734            raise service_error(serice_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        # Tweak search order so that if there are entries in access_user that
738        # have a project matching the export project, we try them first
739        if export_project and export_project.has_key('localname'): 
740            pn = export_project['localname'] 
741               
742            access_sequence = [ (p, u) for p, u in access_user if p == pn] 
743            access_sequence.extend([(p, u) for p, u in access_user if p != pn]) 
744        else: 
745            access_sequence = access_user
746
747        for p, u in access_sequence: 
748            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
749                    "to %s") %  ((p or "None"), u, uri))
750
751            if p:
752                # Request with user and project specified
753                req = {\
754                        'destinationTestbed' : { 'uri' : uri },
755                        'credential': [ "project: %s" % p, "user: %s"  % u],
756                        'allocID' : { 'localname': 'test' },
757                    }
758            else:
759                # Request with only user specified
760                req = {\
761                        'destinationTestbed' : { 'uri' : uri },
762                        'credential': [ 'user: %s' % u ],
763                        'user':  [ {'userID': { 'localname': u } } ],
764                        'allocID' : { 'localname': 'test' },
765                    }
766
767            if tb == master:
768                # NB, the export_project parameter is a dict that includes
769                # the type
770                req['exportProject'] = export_project
771                req['service'] = [
772                        { 'name': 'userconfig', 'visibility': 'export'},
773                        { 'name': 'SMB', 'visibility': 'export'},
774                        { 'name': 'seer', 'visibility': 'export'},
775                        { 'name': 'tmcd', 'visibility': 'export'},
776                    ]
777
778            # node resources if any
779            if nodes != None and len(nodes) > 0:
780                rnodes = [ ]
781                for n in nodes:
782                    rn = { }
783                    image, hw, count = n.split(":")
784                    if image: rn['image'] = [ image ]
785                    if hw: rn['hardware'] = [ hw ]
786                    if count and int(count) >0 : rn['count'] = int(count)
787                    rnodes.append(rn)
788                req['resources']= { }
789                req['resources']['node'] = rnodes
790
791            try:
792                if self.local_access.has_key(uri):
793                    # Local access call
794                    req = { 'RequestAccessRequestBody' : req }
795                    r = self.local_access[uri].RequestAccess(req, 
796                            fedid(file=self.cert_file))
797                    r = { 'RequestAccessResponseBody' : r }
798                else:
799                    r = self.call_RequestAccess(uri, req, 
800                            self.cert_file, self.cert_pwd, self.trusted_certs)
801            except service_error, e:
802                if e.code == service_error.access:
803                    self.log.debug("[get_access] Access denied")
804                    r = None
805                    continue
806                else:
807                    raise e
808
809            if r.has_key('RequestAccessResponseBody'):
810                # Through to here we have a valid response, not a fault.
811                # Access denied is a fault, so something better or worse than
812                # access denied has happened.
813                r = r['RequestAccessResponseBody']
814                self.log.debug("[get_access] Access granted")
815                break
816            else:
817                raise service_error(service_error.protocol,
818                        "Bad proxy response")
819       
820        if not r:
821            raise service_error(service_error.access, 
822                    "Access denied by %s (%s)" % (tb, uri))
823
824        tbparam[tb] = { 
825                "allocID" : r['allocID'],
826                "uri": uri,
827                }
828        if 'service' in r: 
829            services.extend(r['service'])
830
831        # Add attributes to parameter space.  We don't allow attributes to
832        # overlay any parameters already installed.
833        for a in r['fedAttr']:
834            try:
835                if a['attribute'] and \
836                        isinstance(a['attribute'], basestring)\
837                        and not tbparam[tb].has_key(a['attribute'].lower()):
838                    tbparam[tb][a['attribute'].lower()] = a['value']
839            except KeyError:
840                self.log.error("Bad attribute in response: %s" % a)
841
842    def release_access(self, tb, aid, uri=None):
843        """
844        Release access to testbed through fedd
845        """
846
847        if not uri:
848            uri = self.tbmap.get(tb, None)
849        if not uri:
850            raise service_error(service_error.server_config, 
851                    "Unknown testbed: %s" % tb)
852
853        if self.local_access.has_key(uri):
854            resp = self.local_access[uri].ReleaseAccess(\
855                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
856                    fedid(file=self.cert_file))
857            resp = { 'ReleaseAccessResponseBody': resp } 
858        else:
859            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
860                    self.cert_file, self.cert_pwd, self.trusted_certs)
861
862        # better error coding
863
864    def remote_splitter(self, uri, desc, master):
865
866        req = {
867                'description' : { 'ns2description': desc },
868                'master': master,
869                'include_fedkit': bool(self.fedkit),
870                'include_gatewaykit': bool(self.gatewaykit)
871            }
872
873        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
874                self.trusted_certs)
875
876        if r.has_key('Ns2SplitResponseBody'):
877            r = r['Ns2SplitResponseBody']
878            if r.has_key('output'):
879                return r['output'].splitlines()
880            else:
881                raise service_error(service_error.protocol, 
882                        "Bad splitter response (no output)")
883        else:
884            raise service_error(service_error.protocol, "Bad splitter response")
885
886    class start_segment:
887        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
888                cert_pwd=None, trusted_certs=None, caller=None,
889                log_collector=None):
890            self.log = log
891            self.debug = debug
892            self.cert_file = cert_file
893            self.cert_pwd = cert_pwd
894            self.trusted_certs = None
895            self.caller = caller
896            self.testbed = testbed
897            self.log_collector = log_collector
898            self.response = None
899
900        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
901                services=None):
902            req = {
903                    'allocID': { 'fedid' : aid }, 
904                    'segmentdescription': { 
905                        'topdldescription': topo.to_dict(),
906                    },
907                    'master': master,
908                }
909
910            if connInfo:
911                req['connection'] = connInfo
912            # Add services to request.  The master exports, everyone else
913            # imports.
914            if services:
915                svcs = [ x.copy() for x in services]
916                for s in svcs:
917                    if master: s['visibility'] = 'export'
918                    else: s['visibility'] = 'import'
919                req['service'] = svcs
920            if attrs:
921                req['fedAttr'] = attrs
922
923            try:
924                self.log.debug("Calling StartSegment at %s " % uri)
925                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
926                        self.trusted_certs)
927                if r.has_key('StartSegmentResponseBody'):
928                    lval = r['StartSegmentResponseBody'].get('allocationLog',
929                            None)
930                    if lval and self.log_collector:
931                        for line in  lval.splitlines(True):
932                            self.log_collector.write(line)
933                    self.response = r
934                else:
935                    raise service_error(service_error.internal, 
936                            "Bad response!?: %s" %r)
937                return True
938            except service_error, e:
939                self.log.error("Start segment failed on %s: %s" % \
940                        (self.testbed, e))
941                return False
942
943
944
945    class terminate_segment:
946        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
947                cert_pwd=None, trusted_certs=None, caller=None):
948            self.log = log
949            self.debug = debug
950            self.cert_file = cert_file
951            self.cert_pwd = cert_pwd
952            self.trusted_certs = None
953            self.caller = caller
954            self.testbed = testbed
955
956        def __call__(self, uri, aid ):
957            req = {
958                    'allocID': aid , 
959                }
960            try:
961                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
962                        self.trusted_certs)
963                return True
964            except service_error, e:
965                self.log.error("Terminate segment failed on %s: %s" % \
966                        (self.testbed, e))
967                return False
968   
969
970    def allocate_resources(self, allocated, master, eid, expid, 
971            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
972            attrs=None, connInfo={}, services=[]):
973        def get_vlan(r):
974            if r.has_key('StartSegmentResponseBody'):
975                srb = r['StartSegmentResponseBody']
976                if srb.has_key('fedAttr'):
977                    for k, v in [ (a['attribute'], a['value']) \
978                            for a in srb['fedAttr']]:
979                        if k == 'vlan': return v
980            return None
981
982        started = { }           # Testbeds where a sub-experiment started
983                                # successfully
984
985        # XXX
986        fail_soft = False
987
988        slaves = [ k for k in allocated.keys() \
989                if k != master and not topo[k].get_attribute('transit')]
990        transit = [ k for k in allocated.keys() \
991                if topo[k].get_attribute('transit')]
992
993        log = alloc_log or self.log
994
995        thread_pool = self.thread_pool(self.nthreads)
996        threads = [ ]
997
998        for tb in transit:
999            uri = tbparams[tb]['uri']
1000            if tbparams[tb].has_key('allocID') and \
1001                    tbparams[tb]['allocID'].has_key('fedid'):
1002                aid = tbparams[tb]['allocID']['fedid']
1003            else:
1004                raise service_error(service_error.internal, 
1005                        "No alloc id for testbed %s !?" % tb)
1006
1007            m = re.search('(\d+)', tb)
1008            if m:
1009                to_repl = "unassigned%s" % m.group(1)
1010            else:
1011                raise service_error(service_error.internal, 
1012                        "Bad dynamic allocation name")
1013                break
1014
1015            ss = self.start_segment(log=log, debug=self.debug, 
1016                testbed=tb, cert_file=self.cert_file, 
1017                cert_pwd=self.cert_pwd, 
1018                trusted_certs=self.trusted_certs,
1019                caller=self.call_StartSegment,
1020                log_collector=log_collector)
1021            t = self.pooled_thread(
1022                    target=ss,
1023                    args =(uri, aid, topo[tb], False, attrs, connInfo[tb],
1024                        services),
1025                    name=tb, pdata=thread_pool, trace_file=self.trace_file)
1026            threads.append(t)
1027            t.start()
1028            # Wait until the this transit node finishes (keep pinging the log,
1029            # though)
1030
1031            mins = 0
1032            while not thread_pool.wait_for_all_done(60.0):
1033                mins += 1
1034                alloc_log.info("Waiting for transit (it has been %d mins)" \
1035                        % mins)
1036
1037            if t.rv:
1038                vlan = get_vlan(ss.response)
1039                if vlan is not None:
1040                    for k, t in topo.items():
1041                        for e in t.elements:
1042                            for i in e.interface:
1043                                vl = i.get_attribute('dragon_vlan')
1044                                if vl is not None and vl == to_repl:
1045                                    i.set_attribute('dragon_vlan', vlan)
1046            else:
1047                break
1048            thread_pool.clear()
1049
1050
1051        failed = [ t.getName() for t in threads if not t.rv ]
1052
1053        if len(failed) == 0:
1054            for tb in slaves:
1055                # Create and start a thread to start the segment, and save it
1056                # to get the return value later
1057                thread_pool.wait_for_slot()
1058                uri = self.tbmap.get(tb, None)
1059                if not uri:
1060                    raise service_error(service_error.internal, 
1061                            "Unknown testbed %s !?" % tb)
1062
1063                if tbparams[tb].has_key('allocID') and \
1064                        tbparams[tb]['allocID'].has_key('fedid'):
1065                    aid = tbparams[tb]['allocID']['fedid']
1066                else:
1067                    raise service_error(service_error.internal, 
1068                            "No alloc id for testbed %s !?" % tb)
1069
1070                t  = self.pooled_thread(\
1071                        target=self.start_segment(log=log, debug=self.debug,
1072                            testbed=tb, cert_file=self.cert_file,
1073                            cert_pwd=self.cert_pwd,
1074                            trusted_certs=self.trusted_certs,
1075                            caller=self.call_StartSegment,
1076                            log_collector=log_collector), 
1077                        args=(uri, aid, topo[tb], False, attrs, connInfo[tb],
1078                            services),
1079                        name=tb,
1080                        pdata=thread_pool, trace_file=self.trace_file)
1081                threads.append(t)
1082                t.start()
1083
1084            # Wait until all finish (keep pinging the log, though)
1085            mins = 0
1086            while not thread_pool.wait_for_all_done(60.0):
1087                mins += 1
1088                alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1089                        % mins)
1090
1091            thread_pool.clear()
1092
1093        # If none failed, start the master
1094        failed = [ t.getName() for t in threads if not t.rv ]
1095
1096        if len(failed) == 0:
1097            uri = self.tbmap.get(master, None)
1098            if not uri:
1099                raise service_error(service_error.internal, 
1100                        "Unknown testbed %s !?" % master)
1101
1102            if tbparams[master].has_key('allocID') and \
1103                    tbparams[master]['allocID'].has_key('fedid'):
1104                aid = tbparams[master]['allocID']['fedid']
1105            else:
1106                raise service_error(service_error.internal, 
1107                    "No alloc id for testbed %s !?" % master)
1108            t = self.pooled_thread(
1109                    target=self.start_segment(log=log, debug=self.debug, 
1110                        testbed=master, cert_file=self.cert_file, 
1111                        cert_pwd=self.cert_pwd, 
1112                        trusted_certs=self.trusted_certs,
1113                        caller=self.call_StartSegment,
1114                        log_collector=log_collector),
1115                    args =(uri, aid, topo[master], True, attrs, 
1116                        connInfo[master], services),
1117                    name=master, pdata=thread_pool, trace_file=self.trace_file)
1118            threads.append(t)
1119            t.start()
1120            # Wait until the master finishes (keep pinging the log, though)
1121            mins = 0
1122            while not thread_pool.wait_for_all_done(60.0):
1123                mins += 1
1124                alloc_log.info("Waiting for master (it has been %d mins)" \
1125                        % mins)
1126            # update failed to include the master, if it failed
1127            failed = [ t.getName() for t in threads if not t.rv ]
1128
1129        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1130        # If one failed clean up, unless fail_soft is set
1131        if failed:
1132            if not fail_soft:
1133                thread_pool.clear()
1134                for tb in succeeded:
1135                    # Create and start a thread to stop the segment
1136                    thread_pool.wait_for_slot()
1137                    uri = tbparams[tb]['uri']
1138                    t  = self.pooled_thread(\
1139                            target=self.terminate_segment(log=log,
1140                                testbed=tb,
1141                                cert_file=self.cert_file, 
1142                                cert_pwd=self.cert_pwd,
1143                                trusted_certs=self.trusted_certs,
1144                                caller=self.call_TerminateSegment),
1145                            args=(uri, tbparams[tb]['federant']['allocID']),
1146                            name=tb,
1147                            pdata=thread_pool, trace_file=self.trace_file)
1148                    t.start()
1149                # Wait until all finish
1150                thread_pool.wait_for_all_done()
1151
1152                # release the allocations
1153                for tb in tbparams.keys():
1154                    self.release_access(tb, tbparams[tb]['allocID'],
1155                            tbparams[tb].get('uri', None))
1156                # Remove the placeholder
1157                self.state_lock.acquire()
1158                self.state[eid]['experimentStatus'] = 'failed'
1159                if self.state_filename: self.write_state()
1160                self.state_lock.release()
1161
1162                log.error("Swap in failed on %s" % ",".join(failed))
1163                return
1164        else:
1165            log.info("[start_segment]: Experiment %s active" % eid)
1166
1167
1168        # Walk up tmpdir, deleting as we go
1169        if self.cleanup:
1170            log.debug("[start_experiment]: removing %s" % tmpdir)
1171            for path, dirs, files in os.walk(tmpdir, topdown=False):
1172                for f in files:
1173                    os.remove(os.path.join(path, f))
1174                for d in dirs:
1175                    os.rmdir(os.path.join(path, d))
1176            os.rmdir(tmpdir)
1177        else:
1178            log.debug("[start_experiment]: not removing %s" % tmpdir)
1179
1180        # Insert the experiment into our state and update the disk copy
1181        self.state_lock.acquire()
1182        self.state[expid]['experimentStatus'] = 'active'
1183        self.state[eid] = self.state[expid]
1184        if self.state_filename: self.write_state()
1185        self.state_lock.release()
1186        return
1187
1188
1189    def add_kit(self, e, kit):
1190        """
1191        Add a Software object created from the list of (install, location)
1192        tuples passed as kit  to the software attribute of an object e.  We
1193        do this enough to break out the code, but it's kind of a hack to
1194        avoid changing the old tuple rep.
1195        """
1196
1197        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1198
1199        if isinstance(e.software, list): e.software.extend(s)
1200        else: e.software = s
1201
1202
1203    def create_experiment_state(self, fid, req, expid, expcert, 
1204            state='starting'):
1205        """
1206        Create the initial entry in the experiment's state.  The expid and
1207        expcert are the experiment's fedid and certifacte that represents that
1208        ID, which are installed in the experiment state.  If the request
1209        includes a suggested local name that is used if possible.  If the local
1210        name is already taken by an experiment owned by this user that has
1211        failed, it is overwritten.  Otherwise new letters are added until a
1212        valid localname is found.  The generated local name is returned.
1213        """
1214
1215        if req.has_key('experimentID') and \
1216                req['experimentID'].has_key('localname'):
1217            overwrite = False
1218            eid = req['experimentID']['localname']
1219            # If there's an old failed experiment here with the same local name
1220            # and accessible by this user, we'll overwrite it, otherwise we'll
1221            # fall through and do the collision avoidance.
1222            old_expid = self.get_experiment_fedid(eid)
1223            if old_expid and self.check_experiment_access(fid, old_expid):
1224                self.state_lock.acquire()
1225                status = self.state[eid].get('experimentStatus', None)
1226                if status and status == 'failed':
1227                    # remove the old access attribute
1228                    self.auth.unset_attribute(fid, old_expid)
1229                    overwrite = True
1230                    del self.state[eid]
1231                    del self.state[old_expid]
1232                self.state_lock.release()
1233            self.state_lock.acquire()
1234            while (self.state.has_key(eid) and not overwrite):
1235                eid += random.choice(string.ascii_letters)
1236            # Initial state
1237            self.state[eid] = {
1238                    'experimentID' : \
1239                            [ { 'localname' : eid }, {'fedid': expid } ],
1240                    'experimentStatus': state,
1241                    'experimentAccess': { 'X509' : expcert },
1242                    'owner': fid,
1243                    'log' : [],
1244                }
1245            self.state[expid] = self.state[eid]
1246            if self.state_filename: self.write_state()
1247            self.state_lock.release()
1248        else:
1249            eid = self.exp_stem
1250            for i in range(0,5):
1251                eid += random.choice(string.ascii_letters)
1252            self.state_lock.acquire()
1253            while (self.state.has_key(eid)):
1254                eid = self.exp_stem
1255                for i in range(0,5):
1256                    eid += random.choice(string.ascii_letters)
1257            # Initial state
1258            self.state[eid] = {
1259                    'experimentID' : \
1260                            [ { 'localname' : eid }, {'fedid': expid } ],
1261                    'experimentStatus': state,
1262                    'experimentAccess': { 'X509' : expcert },
1263                    'owner': fid,
1264                    'log' : [],
1265                }
1266            self.state[expid] = self.state[eid]
1267            if self.state_filename: self.write_state()
1268            self.state_lock.release()
1269
1270        return eid
1271
1272
1273    def allocate_ips_to_topo(self, top):
1274        """
1275        Add an ip4_address attribute to all the hosts in the topology, based on
1276        the shared substrates on which they sit.  An /etc/hosts file is also
1277        created and returned as a list of hostfiles entries.  We also return
1278        the allocator, because we may need to allocate IPs to portals
1279        (specifically DRAGON portals).
1280        """
1281        subs = sorted(top.substrates, 
1282                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1283                reverse=True)
1284        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1285        ifs = { }
1286        hosts = [ ]
1287
1288        for idx, s in enumerate(subs):
1289            a = ips.allocate(len(s.interfaces)+2)
1290            if a :
1291                base, num = a
1292                if num < len(s.interfaces) +2 : 
1293                    raise service_error(service_error.internal,
1294                            "Allocator returned wrong number of IPs??")
1295            else:
1296                raise service_error(service_error.req, 
1297                        "Cannot allocate IP addresses")
1298
1299            base += 1
1300            for i in s.interfaces:
1301                i.attribute.append(
1302                        topdl.Attribute('ip4_address', 
1303                            "%s" % ip_addr(base)))
1304                hname = i.element.name[0]
1305                if ifs.has_key(hname):
1306                    hosts.append("%s\t%s-%s %s-%d" % \
1307                            (ip_addr(base), hname, s.name, hname,
1308                                ifs[hname]))
1309                else:
1310                    ifs[hname] = 0
1311                    hosts.append("%s\t%s-%s %s-%d %s" % \
1312                            (ip_addr(base), hname, s.name, hname,
1313                                ifs[hname], hname))
1314
1315                ifs[hname] += 1
1316                base += 1
1317        return hosts, ips
1318
1319    def get_access_to_testbeds(self, testbeds, access_user, 
1320            export_project, master, allocated, tbparams, services):
1321        """
1322        Request access to the various testbeds required for this instantiation
1323        (passed in as testbeds).  User, access_user, expoert_project and master
1324        are used to construct the correct requests.  Per-testbed parameters are
1325        returned in tbparams.
1326        """
1327        for tb in testbeds:
1328            self.get_access(tb, None, tbparams, master,
1329                    export_project, access_user, services)
1330            allocated[tb] = 1
1331
1332    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1333        """
1334        Create the sub-topologies that are needed for experiment instantiation.
1335        """
1336        for tb in testbeds:
1337            topo[tb] = top.clone()
1338            to_delete = [ ]
1339            # XXX: copy in for loop to simplify
1340            for e in topo[tb].elements:
1341                etb = e.get_attribute('testbed')
1342                if etb and etb != tb:
1343                    for i in e.interface:
1344                        for s in i.subs:
1345                            try:
1346                                s.interfaces.remove(i)
1347                            except ValueError:
1348                                raise service_error(service_error.internal,
1349                                        "Can't remove interface??")
1350                    to_delete.append(e)
1351            for e in to_delete:
1352                topo[tb].elements.remove(e)
1353            topo[tb].make_indices()
1354
1355            for e in [ e for e in topo[tb].elements \
1356                    if isinstance(e,topdl.Computer)]:
1357                if self.fedkit: self.add_kit(e, self.fedkit)
1358
1359    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1360            portal_type, iface_desc=()):
1361        """
1362        Return a new internet portal node and a dict with the connectionInfo to
1363        be attached.
1364        """
1365        dproject = tbparams[dt].get('project', 'project')
1366        ddomain = tbparams[dt].get('domain', ".example.com")
1367        mdomain = tbparams[master].get('domain', '.example.com')
1368        mproject = tbparams[master].get('project', 'project')
1369        muser = tbparams[master].get('user', 'root')
1370        smbshare = tbparams[master].get('smbshare', 'USERS')
1371
1372        if st == master or dt == master:
1373            active = ("%s" % (st == master))
1374        else:
1375            active = ("%s" % (st > dt))
1376
1377        ifaces = [ ]
1378        for sub, attrs in iface_desc:
1379            inf = topdl.Interface(
1380                    substrate=sub,
1381                    attribute=[
1382                        topdl.Attribute(
1383                            attribute=n,
1384                            value = v)
1385                        for n, v in attrs
1386                        ]
1387                    )
1388            ifaces.append(inf)
1389        info = {
1390                "type" : "ssh", 
1391                "portal": myname,
1392                'peer': "%s.%s.%s%s" %  (desthost.lower(), eid.lower(),
1393                            dproject.lower(), ddomain.lower()),
1394                'fedAttr': [ 
1395                        { 'attribute': 'masterdomain', 'value': mdomain},
1396                        { 'attribute': 'masterexperiment', 'value': 
1397                            "%s/%s" % (mproject, eid)},
1398                        { 'attribute': 'active', 'value': active},
1399                        # Move to SMB service description
1400                        { 'attribute': 'masteruser', 'value': muser},
1401                        { 'attribute': 'smbshare', 'value': smbshare},
1402                    ],
1403                }
1404        return (topdl.Computer(
1405                name=myname,
1406                attribute=[ 
1407                    topdl.Attribute(attribute=n,value=v)
1408                        for n, v in (\
1409                            ('portal', 'true'),
1410                            ('portal_type', portal_type), 
1411                        )
1412                    ],
1413                interface=ifaces,
1414                ), info)
1415
1416    def new_portal_substrate(self, st, dt, eid, tbparams):
1417        ddomain = tbparams[dt].get('domain', ".example.com")
1418        dproject = tbparams[dt].get('project', 'project')
1419        tsubstrate = \
1420                topdl.Substrate(name='%s-%s' % (st, dt),
1421                        attribute= [
1422                            topdl.Attribute(
1423                                attribute='portal',
1424                                value='true')
1425                            ]
1426                        )
1427        segment_element = topdl.Segment(
1428                id= tbparams[dt]['allocID'],
1429                type='emulab',
1430                uri = self.tbmap.get(dt, None),
1431                interface=[ 
1432                    topdl.Interface(
1433                        substrate=tsubstrate.name),
1434                    ],
1435                attribute = [
1436                    topdl.Attribute(attribute=n, value=v)
1437                        for n, v in (\
1438                            ('domain', ddomain),
1439                            ('experiment', "%s/%s" % \
1440                                    (dproject, eid)),)
1441                    ],
1442                )
1443
1444        return (tsubstrate, segment_element)
1445
1446    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
1447        if sub.capacity is None:
1448            raise service_error(service_error.internal,
1449                    "Cannot DRAGON split substrate w/o capacity")
1450        segs = [ ]
1451        substr = topdl.Substrate(name="dragon%d" % idx, 
1452                capacity=sub.capacity.clone(),
1453                attribute=[ topdl.Attribute(attribute=n, value=v)
1454                    for n, v, in (\
1455                            ('vlan', 'unassigned%d' % idx),)])
1456        for tb in tbs.keys():
1457            seg = topdl.Segment(
1458                    id = tbparams[tb]['allocID'],
1459                    type='emulab',
1460                    uri = self.tbmap.get(tb, None),
1461                    interface=[ 
1462                        topdl.Interface(
1463                            substrate=substr.name),
1464                        ],
1465                    attribute=[ topdl.Attribute(
1466                        attribute='dragon_endpoint', 
1467                        value=tbparams[tb]['dragon']),
1468                        ]
1469                    )
1470            if tbparams[tb].has_key('vlans'):
1471                seg.set_attribute('vlans', tbparams[tb]['vlans'])
1472            segs.append(seg)
1473
1474        topo["dragon%d" %idx] = \
1475                topdl.Topology(substrates=[substr], elements=segs,
1476                        attribute=[
1477                            topdl.Attribute(attribute="transit", value='true'),
1478                            topdl.Attribute(attribute="dynamic", value='true'),
1479                            topdl.Attribute(attribute="testbed", value='dragon'),
1480                            ]
1481                        )
1482
1483    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid):
1484        """
1485        Add attribiutes to the various elements indicating that they are to be
1486        dragon connected and create a dragon segment in tops to be
1487        instantiated.
1488        """
1489
1490        def get_substrate_from_topo(name, t):
1491            for s in t.substrates:
1492                if s.name == name: return s
1493            else: return None
1494
1495        dn = len([x for x in topo.keys() if x.startswith('dragon')])
1496        elements = [ i.element for i in sub.interfaces ]
1497        count = { }
1498        for e in elements:
1499            tb = e.get_attribute('testbed')
1500            count[tb] = count.get(tb, 0) + 1
1501
1502        for tb in tbs.keys():
1503            s = get_substrate_from_topo(sub.name, topo[tb])
1504            if s:
1505                for i in s.interfaces:
1506                    i.set_attribute('dragon_vlan', 'unassigned%d' % dn)
1507                    if count[tb] > 1: i.set_attribute('dragon_type', 'lan')
1508                    else: i.set_attribute('dragon_type', 'link')
1509            else:
1510                raise service_error(service_error.internal,
1511                        "No substrate %s in testbed %s" % (sub.name, tb))
1512
1513        self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
1514
1515    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
1516            segment_substrate, portals, connInfo):
1517        # More than one testbed is on this substrate.  Insert
1518        # some portals into the subtopologies.  st == source testbed,
1519        # dt == destination testbed.
1520        for st in tbs.keys():
1521            if not segment_substrate.has_key(st):
1522                segment_substrate[st] = { }
1523            if not portals.has_key(st): 
1524                portals[st] = { }
1525            if not connInfo.has_key(st):
1526                connInfo[st] = [ ]
1527            for dt in [ t for t in tbs.keys() if t != st]:
1528                sproject = tbparams[st].get('project', 'project')
1529                dproject = tbparams[dt].get('project', 'project')
1530                mproject = tbparams[master].get('project', 'project')
1531                sdomain = tbparams[st].get('domain', ".example.com")
1532                ddomain = tbparams[dt].get('domain', ".example.com")
1533                mdomain = tbparams[master].get('domain', '.example.com')
1534                muser = tbparams[master].get('user', 'root')
1535                smbshare = tbparams[master].get('smbshare', 'USERS')
1536                aid = tbparams[dt]['allocID']['fedid']
1537                if st == master or dt == master:
1538                    active = ("%s" % (st == master))
1539                else:
1540                    active = ("%s" %(st > dt))
1541                if not segment_substrate[st].has_key(dt):
1542                    # Put a substrate and a segment for the connected
1543                    # testbed in there.
1544                    tsubstrate, segment_element = \
1545                            self.new_portal_substrate(st, dt, eid, tbparams)
1546                    segment_substrate[st][dt] = tsubstrate
1547                    topo[st].substrates.append(tsubstrate)
1548                    topo[st].elements.append(segment_element)
1549
1550                new_portal = False
1551                if portals[st].has_key(dt):
1552                    # There's a portal set up to go to this destination.
1553                    # See if there's room to multiplex this connection on
1554                    # it.  If so, add an interface to the portal; if not,
1555                    # set up to add a portal below.
1556                    # [This little festival of braces is just a pop of the
1557                    # last element in the list of portals between st and
1558                    # dt.]
1559                    portal = portals[st][dt][-1]
1560                    mux = len([ i for i in portal.interface \
1561                            if not i.get_attribute('portal')])
1562                    if mux == self.muxmax:
1563                        new_portal = True
1564                        portal_type = "experiment"
1565                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1566                        desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1567                    else:
1568                        new_i = topdl.Interface(
1569                                substrate=sub.name,
1570                                attribute=[ 
1571                                    topdl.Attribute(
1572                                        attribute='ip4_address', 
1573                                        value=tbs[dt]
1574                                    )
1575                                ])
1576                        portal.interface.append(new_i)
1577                else:
1578                    # First connection to this testbed, make an empty list
1579                    # and set up to add the new portal below
1580                    new_portal = True
1581                    portals[st][dt] = [ ]
1582                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1583                    desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1584
1585                    if dt == master or st == master: portal_type = "both"
1586                    else: portal_type = "experiment"
1587
1588                if new_portal:
1589                    infs = (
1590                            (segment_substrate[st][dt].name, 
1591                                (('portal', 'true'),)),
1592                            (sub.name, 
1593                                (('ip4_address', tbs[dt]),))
1594                        )
1595                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
1596                            master, eid, myname, desthost, portal_type,
1597                            infs)
1598                    if self.fedkit:
1599                        self.add_kit(portal, self.fedkit)
1600                    if self.gatewaykit: 
1601                        self.add_kit(portal, self.gatewaykit)
1602
1603                    topo[st].elements.append(portal)
1604                    portals[st][dt].append(portal)
1605                    connInfo[st].append(info)
1606
1607    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo):
1608        # Add to the master testbed
1609        tsubstrate, segment_element = \
1610                self.new_portal_substrate(st, dt, eid, tbparams)
1611        myname = "%stunnel" % dt
1612        desthost = "%stunnel" % st
1613
1614        portal, info = self.new_portal_node(st, dt, tbparams, master,
1615                eid, myname, desthost, "control", 
1616                ((tsubstrate.name,(('portal','true'),)),))
1617        if self.fedkit:
1618            self.add_kit(portal, self.fedkit)
1619        if self.gatewaykit: 
1620            self.add_kit(portal, self.gatewaykit)
1621
1622        topo[st].substrates.append(tsubstrate)
1623        topo[st].elements.append(segment_element)
1624        topo[st].elements.append(portal)
1625        if not connInfo.has_key(st):
1626            connInfo[st] = [ ]
1627        connInfo[st].append(info)
1628
1629    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
1630            substrate, tbparams):
1631        # Add to the master testbed
1632        myname = "%stunnel" % dt
1633        desthost = "%s" % ip_addr(dip)
1634
1635        portal = self.new_portal_node(st, dt, tbparams, master,
1636                eid, myname, desthost, "control", 
1637                ((substrate.name,(
1638                    ('portal','true'),
1639                    ('ip4_address', "%s" % ip_addr(myip)),
1640                    ('dragon_vlan', 'unassigned%d' % idx),
1641                    ('dragon_type', 'link'),)),))
1642        if self.fedkit:
1643            self.add_kit(portal, self.fedkit)
1644        if self.gatewaykit: 
1645            self.add_kit(portal, self.gatewaykit)
1646
1647        return portal
1648
1649    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
1650            connInfo):
1651        """
1652        For each substrate in the main topology, find those that
1653        have nodes on more than one testbed.  Insert portal nodes
1654        into the copies of those substrates on the sub topologies.
1655        """
1656        segment_substrate = { }
1657        portals = { }
1658        for s in top.substrates:
1659            # tbs will contain an ip address on this subsrate that is in
1660            # each testbed.
1661            tbs = { }
1662            for i in s.interfaces:
1663                e = i.element
1664                tb = e.get_attribute('testbed')
1665                if tb and not tbs.has_key(tb):
1666                    for i in e.interface:
1667                        if s in i.subs:
1668                            tbs[tb]= i.get_attribute('ip4_address')
1669            if len(tbs) < 2:
1670                continue
1671
1672            # DRAGON will not create multi-site vlans yet
1673            if len(tbs) == 2 and \
1674                    all([tbparams[x].has_key('dragon') for x in tbs]):
1675                self.create_dragon_substrate(s, topo, tbs, tbparams, 
1676                        master, eid)
1677            else:
1678                self.insert_internet_portals(s, topo, tbs, tbparams, master,
1679                        eid, segment_substrate, portals, connInfo)
1680
1681        # Make sure that all the slaves have a control portal back to the
1682        # master.
1683        for tb in [ t for t in tbparams.keys() if t != master ]:
1684            if len([e for e in topo[tb].elements \
1685                    if isinstance(e, topdl.Computer) and \
1686                    e.get_attribute('portal') and \
1687                    e.get_attribute('portal_type') == 'both']) == 0:
1688
1689                if tbparams[master].has_key('dragon') \
1690                        and tbparams[tb].has_key('dragon'):
1691
1692                    idx = len([x for x in topo.keys() \
1693                            if x.startswith('dragon')])
1694                    dip, leng = ip_allocator.allocate(4)
1695                    dip += 1
1696                    mip = dip+1
1697                    csub = topdl.Substrate(
1698                            name="dragon-control-%s" % tb,
1699                            capacity=topdl.Capacity(100000.0, 'max'),
1700                            attribute=[
1701                                topdl.Attribute(
1702                                    attribute='portal',
1703                                    value='true'
1704                                    )
1705                                ]
1706                            )
1707                    seg = topdl.Segment(
1708                            id= tbparams[master]['allocID'],
1709                            type='emulab',
1710                            uri = self.tbmap.get(master, None),
1711                            interface=[ 
1712                                topdl.Interface(
1713                                    substrate=csub.name),
1714                                ],
1715                            attribute = [
1716                                topdl.Attribute(attribute=n, value=v)
1717                                    for n, v in (\
1718                                        ('domain', 
1719                                            tbparams[master].get('domain',
1720                                                ".example.com")),
1721                                        ('experiment', "%s/%s" % \
1722                                                (tbparams[master].get(
1723                                                    'project', 
1724                                                    'project'), 
1725                                                    eid)),)
1726                                ],
1727                            )
1728                    topo[tb].substrates.append(csub)
1729                    topo[tb].elements.append(
1730                            self.new_dragon_portal(tb, master, master, eid, 
1731                                dip, mip, idx, csub, tbparams))
1732                    topo[tb].elements.append(seg)
1733
1734                    mcsub = csub.clone()
1735                    seg = topdl.Segment(
1736                            id= tbparams[tb]['allocID'],
1737                            type='emulab',
1738                            uri = self.tbmap.get(tb, None),
1739                            interface=[ 
1740                                topdl.Interface(
1741                                    substrate=csub.name),
1742                                ],
1743                            attribute = [
1744                                topdl.Attribute(attribute=n, value=v)
1745                                    for n, v in (\
1746                                        ('domain', 
1747                                            tbparams[tb].get('domain',
1748                                                ".example.com")),
1749                                        ('experiment', "%s/%s" % \
1750                                                (tbparams[tb].get('project', 
1751                                                    'project'), 
1752                                                    eid)),)
1753                                ],
1754                            )
1755                    topo[master].substrates.append(mcsub)
1756                    topo[master].elements.append(
1757                            self.new_dragon_portal(master, tb, master, eid, 
1758                                mip, dip, idx, mcsub, tbparams))
1759                    topo[master].elements.append(seg)
1760
1761                    self.create_dragon_substrate(csub, topo, 
1762                            {tb: 1, master:1}, tbparams, master, eid)
1763                else:
1764                    self.add_control_portal(master, tb, master, eid, topo, 
1765                            tbparams, connInfo)
1766                    self.add_control_portal(tb, master, master, eid, topo, 
1767                            tbparams, connInfo)
1768
1769        # Connect the portal nodes into the topologies and clear out
1770        # substrates that are not in the topologies
1771        for tb in tbparams.keys():
1772            topo[tb].incorporate_elements()
1773            topo[tb].substrates = \
1774                    [s for s in topo[tb].substrates \
1775                        if len(s.interfaces) >0]
1776
1777    def wrangle_software(self, expid, top, topo, tbparams):
1778        """
1779        Copy software out to the repository directory, allocate permissions and
1780        rewrite the segment topologies to look for the software in local
1781        places.
1782        """
1783
1784        # Copy the rpms and tarfiles to a distribution directory from
1785        # which the federants can retrieve them
1786        linkpath = "%s/software" %  expid
1787        softdir ="%s/%s" % ( self.repodir, linkpath)
1788        softmap = { }
1789        # These are in a list of tuples format (each kit).  This comprehension
1790        # unwraps them into a single list of tuples that initilaizes the set of
1791        # tuples.
1792        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1793                for p, t in l ])
1794        pkgs.update([x.location for e in top.elements \
1795                for x in e.software])
1796        try:
1797            os.makedirs(softdir)
1798        except IOError, e:
1799            raise service_error(
1800                    "Cannot create software directory: %s" % e)
1801        # The actual copying.  Everything's converted into a url for copying.
1802        for pkg in pkgs:
1803            loc = pkg
1804
1805            scheme, host, path = urlparse(loc)[0:3]
1806            dest = os.path.basename(path)
1807            if not scheme:
1808                if not loc.startswith('/'):
1809                    loc = "/%s" % loc
1810                loc = "file://%s" %loc
1811            try:
1812                u = urlopen(loc)
1813            except Exception, e:
1814                raise service_error(service_error.req, 
1815                        "Cannot open %s: %s" % (loc, e))
1816            try:
1817                f = open("%s/%s" % (softdir, dest) , "w")
1818                self.log.debug("Writing %s/%s" % (softdir,dest) )
1819                data = u.read(4096)
1820                while data:
1821                    f.write(data)
1822                    data = u.read(4096)
1823                f.close()
1824                u.close()
1825            except Exception, e:
1826                raise service_error(service_error.internal,
1827                        "Could not copy %s: %s" % (loc, e))
1828            path = re.sub("/tmp", "", linkpath)
1829            # XXX
1830            softmap[pkg] = \
1831                    "%s/%s/%s" %\
1832                    ( self.repo_url, path, dest)
1833
1834            # Allow the individual segments to access the software.
1835            for tb in tbparams.keys():
1836                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1837                        "/%s/%s" % ( path, dest))
1838
1839        # Convert the software locations in the segments into the local
1840        # copies on this host
1841        for soft in [ s for tb in topo.values() \
1842                for e in tb.elements \
1843                    if getattr(e, 'software', False) \
1844                        for s in e.software ]:
1845            if softmap.has_key(soft.location):
1846                soft.location = softmap[soft.location]
1847
1848
1849    def new_experiment(self, req, fid):
1850        """
1851        The external interface to empty initial experiment creation called from
1852        the dispatcher.
1853
1854        Creates a working directory, splits the incoming description using the
1855        splitter script and parses out the avrious subsections using the
1856        lcasses above.  Once each sub-experiment is created, use pooled threads
1857        to instantiate them and start it all up.
1858        """
1859        if not self.auth.check_attribute(fid, 'new'):
1860            raise service_error(service_error.access, "New access denied")
1861
1862        try:
1863            tmpdir = tempfile.mkdtemp(prefix="split-")
1864        except IOError:
1865            raise service_error(service_error.internal, "Cannot create tmp dir")
1866
1867        try:
1868            access_user = self.accessdb[fid]
1869        except KeyError:
1870            raise service_error(service_error.internal,
1871                    "Access map and authorizer out of sync in " + \
1872                            "new_experiment for fedid %s"  % fid)
1873
1874        pid = "dummy"
1875        gid = "dummy"
1876
1877        req = req.get('NewRequestBody', None)
1878        if not req:
1879            raise service_error(service_error.req,
1880                    "Bad request format (no NewRequestBody)")
1881
1882        # Generate an ID for the experiment (slice) and a certificate that the
1883        # allocator can use to prove they own it.  We'll ship it back through
1884        # the encrypted connection.
1885        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1886
1887        #now we're done with the tmpdir, and it should be empty
1888        if self.cleanup:
1889            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1890            os.rmdir(tmpdir)
1891        else:
1892            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1893
1894        eid = self.create_experiment_state(fid, req, expid, expcert, 
1895                state='empty')
1896
1897        # Let users touch the state
1898        self.auth.set_attribute(fid, expid)
1899        self.auth.set_attribute(expid, expid)
1900        # Override fedids can manipulate state as well
1901        for o in self.overrides:
1902            self.auth.set_attribute(o, expid)
1903
1904        rv = {
1905                'experimentID': [
1906                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1907                ],
1908                'experimentStatus': 'empty',
1909                'experimentAccess': { 'X509' : expcert }
1910            }
1911
1912        return rv
1913
1914
1915    def create_experiment(self, req, fid):
1916        """
1917        The external interface to experiment creation called from the
1918        dispatcher.
1919
1920        Creates a working directory, splits the incoming description using the
1921        splitter script and parses out the avrious subsections using the
1922        lcasses above.  Once each sub-experiment is created, use pooled threads
1923        to instantiate them and start it all up.
1924        """
1925
1926        req = req.get('CreateRequestBody', None)
1927        if not req:
1928            raise service_error(service_error.req,
1929                    "Bad request format (no CreateRequestBody)")
1930
1931        # Get the experiment access
1932        exp = req.get('experimentID', None)
1933        if exp:
1934            if exp.has_key('fedid'):
1935                key = exp['fedid']
1936                expid = key
1937                eid = None
1938            elif exp.has_key('localname'):
1939                key = exp['localname']
1940                eid = key
1941                expid = None
1942            else:
1943                raise service_error(service_error.req, "Unknown lookup type")
1944        else:
1945            raise service_error(service_error.req, "No request?")
1946
1947        self.check_experiment_access(fid, key)
1948
1949        try:
1950            tmpdir = tempfile.mkdtemp(prefix="split-")
1951            os.mkdir(tmpdir+"/keys")
1952        except IOError:
1953            raise service_error(service_error.internal, "Cannot create tmp dir")
1954
1955        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1956        gw_secretkey_base = "fed.%s" % self.ssh_type
1957        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1958        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1959        tclfile = tmpdir + "/experiment.tcl"
1960        tbparams = { }
1961        try:
1962            access_user = self.accessdb[fid]
1963        except KeyError:
1964            raise service_error(service_error.internal,
1965                    "Access map and authorizer out of sync in " + \
1966                            "create_experiment for fedid %s"  % fid)
1967
1968        pid = "dummy"
1969        gid = "dummy"
1970
1971        # The tcl parser needs to read a file so put the content into that file
1972        descr=req.get('experimentdescription', None)
1973        if descr:
1974            file_content=descr.get('ns2description', None)
1975            if file_content:
1976                try:
1977                    f = open(tclfile, 'w')
1978                    f.write(file_content)
1979                    f.close()
1980                except IOError:
1981                    raise service_error(service_error.internal,
1982                            "Cannot write temp experiment description")
1983            else:
1984                raise service_error(service_error.req, 
1985                        "Only ns2descriptions supported")
1986        else:
1987            raise service_error(service_error.req, "No experiment description")
1988
1989        self.state_lock.acquire()
1990        if self.state.has_key(key):
1991            self.state[key]['experimentStatus'] = "starting"
1992            for e in self.state[key].get('experimentID',[]):
1993                if not expid and e.has_key('fedid'):
1994                    expid = e['fedid']
1995                elif not eid and e.has_key('localname'):
1996                    eid = e['localname']
1997        self.state_lock.release()
1998
1999        if not (eid and expid):
2000            raise service_error(service_error.internal, 
2001                    "Cannot find local experiment info!?")
2002
2003        try: 
2004            # This catches exceptions to clear the placeholder if necessary
2005            try:
2006                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
2007            except ValueError:
2008                raise service_error(service_error.server_config, 
2009                        "Bad key type (%s)" % self.ssh_type)
2010
2011            master = req.get('master', None)
2012            if not master:
2013                raise service_error(service_error.req,
2014                        "No master testbed label")
2015            export_project = req.get('exportProject', None)
2016            if not export_project:
2017                raise service_error(service_error.req, "No export project")
2018           
2019            # Translate to topdl
2020            if self.splitter_url:
2021                # XXX: need remote topdl translator
2022                self.log.debug("Calling remote splitter at %s" % \
2023                        self.splitter_url)
2024                split_data = self.remote_splitter(self.splitter_url,
2025                        file_content, master)
2026            else:
2027                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
2028                    str(self.muxmax), '-m', master]
2029
2030                if self.fedkit:
2031                    tclcmd.append('-k')
2032
2033                if self.gatewaykit:
2034                    tclcmd.append('-K')
2035
2036                tclcmd.extend([pid, gid, eid, tclfile])
2037
2038                self.log.debug("running local splitter %s", " ".join(tclcmd))
2039                # This is just fantastic.  As a side effect the parser copies
2040                # tb_compat.tcl into the current directory, so that directory
2041                # must be writable by the fedd user.  Doing this in the
2042                # temporary subdir ensures this is the case.
2043                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
2044                        cwd=tmpdir)
2045                split_data = tclparser.stdout
2046
2047            top = topdl.topology_from_xml(file=split_data, top="experiment")
2048
2049            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2050             # Find the testbeds to look up
2051            testbeds = set([ a.value for e in top.elements \
2052                    for a in e.attribute \
2053                    if a.attribute == 'testbed'] )
2054
2055            allocated = { }         # Testbeds we can access
2056            topo ={ }               # Sub topologies
2057            connInfo = { }          # Connection information
2058            services = [ ]
2059            self.get_access_to_testbeds(testbeds, access_user, 
2060                    export_project, master, allocated, tbparams, services)
2061            self.split_topology(top, topo, testbeds, eid, master, tbparams)
2062
2063            # Copy configuration files into the remote file store
2064            # The config urlpath
2065            configpath = "/%s/config" % expid
2066            # The config file system location
2067            configdir ="%s%s" % ( self.repodir, configpath)
2068            try:
2069                os.makedirs(configdir)
2070            except IOError, e:
2071                raise service_error(
2072                        "Cannot create config directory: %s" % e)
2073            try:
2074                f = open("%s/hosts" % configdir, "w")
2075                f.write('\n'.join(hosts))
2076                f.close()
2077            except IOError, e:
2078                raise service_error(service_error.internal, 
2079                        "Cannot write hosts file: %s" % e)
2080            try:
2081                copy_file("%s" % gw_pubkey, "%s/%s" % \
2082                        (configdir, gw_pubkey_base))
2083                copy_file("%s" % gw_secretkey, "%s/%s" % \
2084                        (configdir, gw_secretkey_base))
2085            except IOError, e:
2086                raise service_error(service_error.internal, 
2087                        "Cannot copy keyfiles: %s" % e)
2088
2089            # Allow the individual testbeds to access the configuration files.
2090            for tb in tbparams.keys():
2091                asignee = tbparams[tb]['allocID']['fedid']
2092                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
2093                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
2094
2095            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
2096                    connInfo)
2097            # Now get access to the dynamic testbeds
2098            for k, t in topo.items():
2099                if not t.get_attribute('dynamic'):
2100                    continue
2101                tb = t.get_attribute('testbed')
2102                if tb: 
2103                    self.get_access(tb, None, user, tbparams, master, 
2104                            export_project, access_user, services)
2105                    tbparams[k] = tbparams[tb]
2106                    del tbparams[tb]
2107                    allocated[k] = 1
2108                else:
2109                    raise service_error(service_error.internal, 
2110                            "Dynamic allocation from no testbed!?")
2111
2112            self.wrangle_software(expid, top, topo, tbparams)
2113
2114            vtopo = topdl.topology_to_vtopo(top)
2115            vis = self.genviz(vtopo)
2116
2117            # save federant information
2118            for k in allocated.keys():
2119                tbparams[k]['federant'] = {
2120                        'name': [ { 'localname' : eid} ],
2121                        'allocID' : tbparams[k]['allocID'],
2122                        'master' : k == master,
2123                        'uri': tbparams[k]['uri'],
2124                    }
2125                if tbparams[k].has_key('emulab'):
2126                        tbparams[k]['federant']['emulab'] = \
2127                                tbparams[k]['emulab']
2128
2129            self.state_lock.acquire()
2130            self.state[eid]['vtopo'] = vtopo
2131            self.state[eid]['vis'] = vis
2132            self.state[expid]['federant'] = \
2133                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2134                        if tbparams[tb].has_key('federant') ]
2135            if self.state_filename: 
2136                self.write_state()
2137            self.state_lock.release()
2138        except service_error, e:
2139            # If something goes wrong in the parse (usually an access error)
2140            # clear the placeholder state.  From here on out the code delays
2141            # exceptions.  Failing at this point returns a fault to the remote
2142            # caller.
2143
2144            self.state_lock.acquire()
2145            del self.state[eid]
2146            del self.state[expid]
2147            if self.state_filename: self.write_state()
2148            self.state_lock.release()
2149            raise e
2150
2151
2152        # Start the background swapper and return the starting state.  From
2153        # here on out, the state will stick around a while.
2154
2155        # Let users touch the state
2156        self.auth.set_attribute(fid, expid)
2157        self.auth.set_attribute(expid, expid)
2158        # Override fedids can manipulate state as well
2159        for o in self.overrides:
2160            self.auth.set_attribute(o, expid)
2161
2162        # Create a logger that logs to the experiment's state object as well as
2163        # to the main log file.
2164        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2165        alloc_collector = self.list_log(self.state[eid]['log'])
2166        h = logging.StreamHandler(alloc_collector)
2167        # XXX: there should be a global one of these rather than repeating the
2168        # code.
2169        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2170                    '%d %b %y %H:%M:%S'))
2171        alloc_log.addHandler(h)
2172       
2173        attrs = [ 
2174                {
2175                    'attribute': 'ssh_pubkey', 
2176                    'value': '%s/%s/config/%s' % \
2177                            (self.repo_url, expid, gw_pubkey_base)
2178                },
2179                {
2180                    'attribute': 'ssh_secretkey', 
2181                    'value': '%s/%s/config/%s' % \
2182                            (self.repo_url, expid, gw_secretkey_base)
2183                },
2184                {
2185                    'attribute': 'hosts', 
2186                    'value': '%s/%s/config/hosts' % \
2187                            (self.repo_url, expid)
2188                },
2189                {
2190                    'attribute': 'experiment_name',
2191                    'value': eid,
2192                },
2193            ]
2194
2195        # Start a thread to do the resource allocation
2196        t  = Thread(target=self.allocate_resources,
2197                args=(allocated, master, eid, expid, tbparams, 
2198                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
2199                    services),
2200                name=eid)
2201        t.start()
2202
2203        rv = {
2204                'experimentID': [
2205                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2206                ],
2207                'experimentStatus': 'starting',
2208            }
2209
2210        return rv
2211   
2212    def get_experiment_fedid(self, key):
2213        """
2214        find the fedid associated with the localname key in the state database.
2215        """
2216
2217        rv = None
2218        self.state_lock.acquire()
2219        if self.state.has_key(key):
2220            if isinstance(self.state[key], dict):
2221                try:
2222                    kl = [ f['fedid'] for f in \
2223                            self.state[key]['experimentID']\
2224                                if f.has_key('fedid') ]
2225                except KeyError:
2226                    self.state_lock.release()
2227                    raise service_error(service_error.internal, 
2228                            "No fedid for experiment %s when getting "+\
2229                                    "fedid(!?)" % key)
2230                if len(kl) == 1:
2231                    rv = kl[0]
2232                else:
2233                    self.state_lock.release()
2234                    raise service_error(service_error.internal, 
2235                            "multiple fedids for experiment %s when " +\
2236                                    "getting fedid(!?)" % key)
2237            else:
2238                self.state_lock.release()
2239                raise service_error(service_error.internal, 
2240                        "Unexpected state for %s" % key)
2241        self.state_lock.release()
2242        return rv
2243
2244    def check_experiment_access(self, fid, key):
2245        """
2246        Confirm that the fid has access to the experiment.  Though a request
2247        may be made in terms of a local name, the access attribute is always
2248        the experiment's fedid.
2249        """
2250        if not isinstance(key, fedid):
2251            key = self.get_experiment_fedid(key)
2252
2253        if self.auth.check_attribute(fid, key):
2254            return True
2255        else:
2256            raise service_error(service_error.access, "Access Denied")
2257
2258
2259    def get_handler(self, path, fid):
2260        self.log.info("Get handler %s %s" % (path, fid))
2261        if self.auth.check_attribute(fid, path):
2262            return ("%s/%s" % (self.repodir, path), "application/binary")
2263        else:
2264            return (None, None)
2265
2266    def get_vtopo(self, req, fid):
2267        """
2268        Return the stored virtual topology for this experiment
2269        """
2270        rv = None
2271        state = None
2272
2273        req = req.get('VtopoRequestBody', None)
2274        if not req:
2275            raise service_error(service_error.req,
2276                    "Bad request format (no VtopoRequestBody)")
2277        exp = req.get('experiment', None)
2278        if exp:
2279            if exp.has_key('fedid'):
2280                key = exp['fedid']
2281                keytype = "fedid"
2282            elif exp.has_key('localname'):
2283                key = exp['localname']
2284                keytype = "localname"
2285            else:
2286                raise service_error(service_error.req, "Unknown lookup type")
2287        else:
2288            raise service_error(service_error.req, "No request?")
2289
2290        self.check_experiment_access(fid, key)
2291
2292        self.state_lock.acquire()
2293        if self.state.has_key(key):
2294            if self.state[key].has_key('vtopo'):
2295                rv = { 'experiment' : {keytype: key },\
2296                        'vtopo': self.state[key]['vtopo'],\
2297                    }
2298            else:
2299                state = self.state[key]['experimentStatus']
2300        self.state_lock.release()
2301
2302        if rv: return rv
2303        else: 
2304            if state:
2305                raise service_error(service_error.partial, 
2306                        "Not ready: %s" % state)
2307            else:
2308                raise service_error(service_error.req, "No such experiment")
2309
2310    def get_vis(self, req, fid):
2311        """
2312        Return the stored visualization for this experiment
2313        """
2314        rv = None
2315        state = None
2316
2317        req = req.get('VisRequestBody', None)
2318        if not req:
2319            raise service_error(service_error.req,
2320                    "Bad request format (no VisRequestBody)")
2321        exp = req.get('experiment', None)
2322        if exp:
2323            if exp.has_key('fedid'):
2324                key = exp['fedid']
2325                keytype = "fedid"
2326            elif exp.has_key('localname'):
2327                key = exp['localname']
2328                keytype = "localname"
2329            else:
2330                raise service_error(service_error.req, "Unknown lookup type")
2331        else:
2332            raise service_error(service_error.req, "No request?")
2333
2334        self.check_experiment_access(fid, key)
2335
2336        self.state_lock.acquire()
2337        if self.state.has_key(key):
2338            if self.state[key].has_key('vis'):
2339                rv =  { 'experiment' : {keytype: key },\
2340                        'vis': self.state[key]['vis'],\
2341                        }
2342            else:
2343                state = self.state[key]['experimentStatus']
2344        self.state_lock.release()
2345
2346        if rv: return rv
2347        else:
2348            if state:
2349                raise service_error(service_error.partial, 
2350                        "Not ready: %s" % state)
2351            else:
2352                raise service_error(service_error.req, "No such experiment")
2353
2354    def clean_info_response(self, rv):
2355        """
2356        Remove the information in the experiment's state object that is not in
2357        the info response.
2358        """
2359        # Remove the owner info (should always be there, but...)
2360        if rv.has_key('owner'): del rv['owner']
2361
2362        # Convert the log into the allocationLog parameter and remove the
2363        # log entry (with defensive programming)
2364        if rv.has_key('log'):
2365            rv['allocationLog'] = "".join(rv['log'])
2366            del rv['log']
2367        else:
2368            rv['allocationLog'] = ""
2369
2370        if rv['experimentStatus'] != 'active':
2371            if rv.has_key('federant'): del rv['federant']
2372        else:
2373            # remove the allocationID and uri info from each federant
2374            for f in rv.get('federant', []):
2375                if f.has_key('allocID'): del f['allocID']
2376                if f.has_key('uri'): del f['uri']
2377        return rv
2378
2379    def get_info(self, req, fid):
2380        """
2381        Return all the stored info about this experiment
2382        """
2383        rv = None
2384
2385        req = req.get('InfoRequestBody', None)
2386        if not req:
2387            raise service_error(service_error.req,
2388                    "Bad request format (no InfoRequestBody)")
2389        exp = req.get('experiment', None)
2390        if exp:
2391            if exp.has_key('fedid'):
2392                key = exp['fedid']
2393                keytype = "fedid"
2394            elif exp.has_key('localname'):
2395                key = exp['localname']
2396                keytype = "localname"
2397            else:
2398                raise service_error(service_error.req, "Unknown lookup type")
2399        else:
2400            raise service_error(service_error.req, "No request?")
2401
2402        self.check_experiment_access(fid, key)
2403
2404        # The state may be massaged by the service function that called
2405        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2406        # state.
2407        self.state_lock.acquire()
2408        if self.state.has_key(key):
2409            rv = copy.deepcopy(self.state[key])
2410        self.state_lock.release()
2411
2412        if rv:
2413            return self.clean_info_response(rv)
2414        else:
2415            raise service_error(service_error.req, "No such experiment")
2416
2417    def get_multi_info(self, req, fid):
2418        """
2419        Return all the stored info that this fedid can access
2420        """
2421        rv = { 'info': [ ] }
2422
2423        self.state_lock.acquire()
2424        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2425            try:
2426                self.check_experiment_access(fid, key)
2427            except service_error, e:
2428                if e.code == service_error.access:
2429                    continue
2430                else:
2431                    self.state_lock.release()
2432                    raise e
2433
2434            if self.state.has_key(key):
2435                e = copy.deepcopy(self.state[key])
2436                e = self.clean_info_response(e)
2437                rv['info'].append(e)
2438        self.state_lock.release()
2439        return rv
2440
2441    def terminate_experiment(self, req, fid):
2442        """
2443        Swap this experiment out on the federants and delete the shared
2444        information
2445        """
2446        tbparams = { }
2447        req = req.get('TerminateRequestBody', None)
2448        if not req:
2449            raise service_error(service_error.req,
2450                    "Bad request format (no TerminateRequestBody)")
2451        force = req.get('force', False)
2452        exp = req.get('experiment', None)
2453        if exp:
2454            if exp.has_key('fedid'):
2455                key = exp['fedid']
2456                keytype = "fedid"
2457            elif exp.has_key('localname'):
2458                key = exp['localname']
2459                keytype = "localname"
2460            else:
2461                raise service_error(service_error.req, "Unknown lookup type")
2462        else:
2463            raise service_error(service_error.req, "No request?")
2464
2465        self.check_experiment_access(fid, key)
2466
2467        dealloc_list = [ ]
2468
2469
2470        # Create a logger that logs to the dealloc_list as well as to the main
2471        # log file.
2472        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2473        h = logging.StreamHandler(self.list_log(dealloc_list))
2474        # XXX: there should be a global one of these rather than repeating the
2475        # code.
2476        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2477                    '%d %b %y %H:%M:%S'))
2478        dealloc_log.addHandler(h)
2479
2480        self.state_lock.acquire()
2481        fed_exp = self.state.get(key, None)
2482
2483        if fed_exp:
2484            # This branch of the conditional holds the lock to generate a
2485            # consistent temporary tbparams variable to deallocate experiments.
2486            # It releases the lock to do the deallocations and reacquires it to
2487            # remove the experiment state when the termination is complete.
2488
2489            # First make sure that the experiment creation is complete.
2490            status = fed_exp.get('experimentStatus', None)
2491
2492            if status:
2493                if status in ('starting', 'terminating'):
2494                    if not force:
2495                        self.state_lock.release()
2496                        raise service_error(service_error.partial, 
2497                                'Experiment still being created or destroyed')
2498                    else:
2499                        self.log.warning('Experiment in %s state ' % status + \
2500                                'being terminated by force.')
2501            else:
2502                # No status??? trouble
2503                self.state_lock.release()
2504                raise service_error(service_error.internal,
2505                        "Experiment has no status!?")
2506
2507            ids = []
2508            #  experimentID is a list of dicts that are self-describing
2509            #  identifiers.  This finds all the fedids and localnames - the
2510            #  keys of self.state - and puts them into ids.
2511            for id in fed_exp.get('experimentID', []):
2512                if id.has_key('fedid'): ids.append(id['fedid'])
2513                if id.has_key('localname'): ids.append(id['localname'])
2514
2515            # Collect the allocation/segment ids into a dict keyed by the fedid
2516            # of the allocation (or a monotonically increasing integer) that
2517            # contains a tuple of uri, aid (which is a dict...)
2518            for i, fed in enumerate(fed_exp.get('federant', [])):
2519                try:
2520                    uri = fed['uri']
2521                    aid = fed['allocID']
2522                    k = fed['allocID'].get('fedid', i)
2523                except KeyError, e:
2524                    continue
2525                tbparams[k] = (uri, aid)
2526            fed_exp['experimentStatus'] = 'terminating'
2527            if self.state_filename: self.write_state()
2528            self.state_lock.release()
2529
2530            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2531            # then completes, so we can't wait if nothing starts.  So, no
2532            # tbparams, no start.
2533            if len(tbparams) > 0:
2534                thread_pool = self.thread_pool(self.nthreads)
2535                for k in tbparams.keys():
2536                    # Create and start a thread to stop the segment
2537                    thread_pool.wait_for_slot()
2538                    uri, aid = tbparams[k]
2539                    t  = self.pooled_thread(\
2540                            target=self.terminate_segment(log=dealloc_log,
2541                                testbed=uri,
2542                                cert_file=self.cert_file, 
2543                                cert_pwd=self.cert_pwd,
2544                                trusted_certs=self.trusted_certs,
2545                                caller=self.call_TerminateSegment),
2546                            args=(uri, aid), name=k,
2547                            pdata=thread_pool, trace_file=self.trace_file)
2548                    t.start()
2549                # Wait for completions
2550                thread_pool.wait_for_all_done()
2551
2552            # release the allocations (failed experiments have done this
2553            # already, and starting experiments may be in odd states, so we
2554            # ignore errors releasing those allocations
2555            try: 
2556                for k in tbparams.keys():
2557                    # This releases access by uri
2558                    uri, aid = tbparams[k]
2559                    self.release_access(None, aid, uri=uri)
2560            except service_error, e:
2561                if status != 'failed' and not force:
2562                    raise e
2563
2564            # Remove the terminated experiment
2565            self.state_lock.acquire()
2566            for id in ids:
2567                if self.state.has_key(id): del self.state[id]
2568
2569            if self.state_filename: self.write_state()
2570            self.state_lock.release()
2571
2572            return { 
2573                    'experiment': exp , 
2574                    'deallocationLog': "".join(dealloc_list),
2575                    }
2576        else:
2577            # Don't forget to release the lock
2578            self.state_lock.release()
2579            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.