source: fedd/federation/experiment_control.py @ 1d913e13

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 1d913e13 was 13e3dd2, checked in by Ted Faber <faber@…>, 15 years ago

Proper multiplexing of portal nodes, correct determination of portal type and
some modularization to encourage it.

  • Property mode set to 100644
File size: 78.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self, timeout=None):
123            """
124            Wait until all active threads finish (and at least one has
125            started).  If a timeout is given, return after waiting that long
126            for termination.  If all threads are done (and one has started in
127            the since the last clear()) return True, otherwise False.
128            """
129            if timeout:
130                deadline = time.time() + timeout
131            self.acquire()
132            while self.started == 0 or self.started > self.terminated:
133                self.wait(timeout)
134                if timeout:
135                    if time.time() > deadline:
136                        break
137                    timeout = deadline - time.time()
138            self.release()
139            return not (self.started == 0 or self.started > self.terminated)
140
141    class pooled_thread(Thread):
142        """
143        One of a set of threads dedicated to a specific task.  Uses the
144        thread_pool class above for coordination.
145        """
146        def __init__(self, group=None, target=None, name=None, args=(), 
147                kwargs={}, pdata=None, trace_file=None):
148            Thread.__init__(self, group, target, name, args, kwargs)
149            self.rv = None          # Return value of the ops in this thread
150            self.exception = None   # Exception that terminated this thread
151            self.target=target      # Target function to run on start()
152            self.args = args        # Args to pass to target
153            self.kwargs = kwargs    # Additional kw args
154            self.pdata = pdata      # thread_pool for this class
155            # Logger for this thread
156            self.log = logging.getLogger("fedd.experiment_control")
157       
158        def run(self):
159            """
160            Emulate Thread.run, except add pool data manipulation and error
161            logging.
162            """
163            if self.pdata:
164                self.pdata.start()
165
166            if self.target:
167                try:
168                    self.rv = self.target(*self.args, **self.kwargs)
169                except service_error, s:
170                    self.exception = s
171                    self.log.error("Thread exception: %s %s" % \
172                            (s.code_string(), s.desc))
173                except:
174                    self.exception = sys.exc_info()[1]
175                    self.log.error(("Unexpected thread exception: %s" +\
176                            "Trace %s") % (self.exception,\
177                                traceback.format_exc()))
178            if self.pdata:
179                self.pdata.terminate()
180
181    call_RequestAccess = service_caller('RequestAccess')
182    call_ReleaseAccess = service_caller('ReleaseAccess')
183    call_StartSegment = service_caller('StartSegment')
184    call_TerminateSegment = service_caller('TerminateSegment')
185    call_Ns2Split = service_caller('Ns2Split')
186
187    def __init__(self, config=None, auth=None):
188        """
189        Intialize the various attributes, most from the config object
190        """
191
192        def parse_tarfile_list(tf):
193            """
194            Parse a tarfile list from the configuration.  This is a set of
195            paths and tarfiles separated by spaces.
196            """
197            rv = [ ]
198            if tf is not None:
199                tl = tf.split()
200                while len(tl) > 1:
201                    p, t = tl[0:2]
202                    del tl[0:2]
203                    rv.append((p, t))
204            return rv
205
206        self.thread_with_rv = experiment_control_local.pooled_thread
207        self.thread_pool = experiment_control_local.thread_pool
208        self.list_log = list_log.list_log
209
210        self.cert_file = config.get("experiment_control", "cert_file")
211        if self.cert_file:
212            self.cert_pwd = config.get("experiment_control", "cert_pwd")
213        else:
214            self.cert_file = config.get("globals", "cert_file")
215            self.cert_pwd = config.get("globals", "cert_pwd")
216
217        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
218                or config.get("globals", "trusted_certs")
219
220        self.repodir = config.get("experiment_control", "repodir")
221
222        self.exp_stem = "fed-stem"
223        self.log = logging.getLogger("fedd.experiment_control")
224        set_log_level(config, "experiment_control", self.log)
225        self.muxmax = 2
226        self.nthreads = 2
227        self.randomize_experiments = False
228
229        self.splitter = None
230        self.ssh_keygen = "/usr/bin/ssh-keygen"
231        self.ssh_identity_file = None
232
233
234        self.debug = config.getboolean("experiment_control", "create_debug")
235        self.state_filename = config.get("experiment_control", 
236                "experiment_state")
237        self.splitter_url = config.get("experiment_control", "splitter_uri")
238        self.fedkit = parse_tarfile_list(\
239                config.get("experiment_control", "fedkit"))
240        self.gatewaykit = parse_tarfile_list(\
241                config.get("experiment_control", "gatewaykit"))
242        accessdb_file = config.get("experiment_control", "accessdb")
243
244        self.ssh_pubkey_file = config.get("experiment_control", 
245                "ssh_pubkey_file")
246        self.ssh_privkey_file = config.get("experiment_control",
247                "ssh_privkey_file")
248        # NB for internal master/slave ops, not experiment setup
249        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
250
251        self.overrides = set([])
252        ovr = config.get('experiment_control', 'overrides')
253        if ovr:
254            for o in ovr.split(","):
255                o = o.strip()
256                if o.startswith('fedid:'): o = o[len('fedid:'):]
257                self.overrides.add(fedid(hexstr=o))
258
259        self.state = { }
260        self.state_lock = Lock()
261        self.tclsh = "/usr/local/bin/otclsh"
262        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
263                config.get("experiment_control", "tcl_splitter",
264                        "/usr/testbed/lib/ns2ir/parse.tcl")
265        mapdb_file = config.get("experiment_control", "mapdb")
266        self.trace_file = sys.stderr
267
268        self.def_expstart = \
269                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
270                "/tmp/federate";
271        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
272                "FEDDIR/hosts";
273        self.def_gwstart = \
274                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
275                "/tmp/bridge.log";
276        self.def_mgwstart = \
277                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
278                "/tmp/bridge.log";
279        self.def_gwimage = "FBSD61-TUNNEL2";
280        self.def_gwtype = "pc";
281        self.local_access = { }
282
283        if auth:
284            self.auth = auth
285        else:
286            self.log.error(\
287                    "[access]: No authorizer initialized, creating local one.")
288            auth = authorizer()
289
290
291        if self.ssh_pubkey_file:
292            try:
293                f = open(self.ssh_pubkey_file, 'r')
294                self.ssh_pubkey = f.read()
295                f.close()
296            except IOError:
297                raise service_error(service_error.internal,
298                        "Cannot read sshpubkey")
299        else:
300            raise service_error(service_error.internal, 
301                    "No SSH public key file?")
302
303        if not self.ssh_privkey_file:
304            raise service_error(service_error.internal, 
305                    "No SSH public key file?")
306
307
308        if mapdb_file:
309            self.read_mapdb(mapdb_file)
310        else:
311            self.log.warn("[experiment_control] No testbed map, using defaults")
312            self.tbmap = { 
313                    'deter':'https://users.isi.deterlab.net:23235',
314                    'emulab':'https://users.isi.deterlab.net:23236',
315                    'ucb':'https://users.isi.deterlab.net:23237',
316                    }
317
318        if accessdb_file:
319                self.read_accessdb(accessdb_file)
320        else:
321            raise service_error(service_error.internal,
322                    "No accessdb specified in config")
323
324        # Grab saved state.  OK to do this w/o locking because it's read only
325        # and only one thread should be in existence that can see self.state at
326        # this point.
327        if self.state_filename:
328            self.read_state()
329
330        # Dispatch tables
331        self.soap_services = {\
332                'Create': soap_handler('Create', self.create_experiment),
333                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
334                'Vis': soap_handler('Vis', self.get_vis),
335                'Info': soap_handler('Info', self.get_info),
336                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
337                'Terminate': soap_handler('Terminate', 
338                    self.terminate_experiment),
339        }
340
341        self.xmlrpc_services = {\
342                'Create': xmlrpc_handler('Create', self.create_experiment),
343                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
344                'Vis': xmlrpc_handler('Vis', self.get_vis),
345                'Info': xmlrpc_handler('Info', self.get_info),
346                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
347                'Terminate': xmlrpc_handler('Terminate',
348                    self.terminate_experiment),
349        }
350
351    # Call while holding self.state_lock
352    def write_state(self):
353        """
354        Write a new copy of experiment state after copying the existing state
355        to a backup.
356
357        State format is a simple pickling of the state dictionary.
358        """
359        if os.access(self.state_filename, os.W_OK):
360            copy_file(self.state_filename, \
361                    "%s.bak" % self.state_filename)
362        try:
363            f = open(self.state_filename, 'w')
364            pickle.dump(self.state, f)
365        except IOError, e:
366            self.log.error("Can't write file %s: %s" % \
367                    (self.state_filename, e))
368        except pickle.PicklingError, e:
369            self.log.error("Pickling problem: %s" % e)
370        except TypeError, e:
371            self.log.error("Pickling problem (TypeError): %s" % e)
372
373    # Call while holding self.state_lock
374    def read_state(self):
375        """
376        Read a new copy of experiment state.  Old state is overwritten.
377
378        State format is a simple pickling of the state dictionary.
379        """
380       
381        def get_experiment_id(state):
382            """
383            Pull the fedid experimentID out of the saved state.  This is kind
384            of a gross walk through the dict.
385            """
386
387            if state.has_key('experimentID'):
388                for e in state['experimentID']:
389                    if e.has_key('fedid'):
390                        return e['fedid']
391                else:
392                    return None
393            else:
394                return None
395
396        def get_alloc_ids(state):
397            """
398            Pull the fedids of the identifiers of each allocation from the
399            state.  Again, a dict dive that's best isolated.
400            """
401
402            return [ f['allocID']['fedid'] 
403                    for f in state.get('federant',[]) \
404                        if f.has_key('allocID') and \
405                            f['allocID'].has_key('fedid')]
406
407
408        try:
409            f = open(self.state_filename, "r")
410            self.state = pickle.load(f)
411            self.log.debug("[read_state]: Read state from %s" % \
412                    self.state_filename)
413        except IOError, e:
414            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
415                    % (self.state_filename, e))
416        except pickle.UnpicklingError, e:
417            self.log.warning(("[read_state]: No saved state: " + \
418                    "Unpickling failed: %s") % e)
419       
420        for s in self.state.values():
421            try:
422
423                eid = get_experiment_id(s)
424                if eid : 
425                    # Give the owner rights to the experiment
426                    self.auth.set_attribute(s['owner'], eid)
427                    # And holders of the eid as well
428                    self.auth.set_attribute(eid, eid)
429                    # allow overrides to control experiments as well
430                    for o in self.overrides:
431                        self.auth.set_attribute(o, eid)
432                    # Set permissions to allow reading of the software repo, if
433                    # any, as well.
434                    for a in get_alloc_ids(s):
435                        self.auth.set_attribute(a, 'repo/%s' % eid)
436                else:
437                    raise KeyError("No experiment id")
438            except KeyError, e:
439                self.log.warning("[read_state]: State ownership or identity " +\
440                        "misformatted in %s: %s" % (self.state_filename, e))
441
442
443    def read_accessdb(self, accessdb_file):
444        """
445        Read the mapping from fedids that can create experiments to their name
446        in the 3-level access namespace.  All will be asserted from this
447        testbed and can include the local username and porject that will be
448        asserted on their behalf by this fedd.  Each fedid is also added to the
449        authorization system with the "create" attribute.
450        """
451        self.accessdb = {}
452        # These are the regexps for parsing the db
453        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
454        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
455                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
456        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
457                "\s*->\s*(" + name_expr + ")\s*$")
458        lineno = 0
459
460        # Parse the mappings and store in self.authdb, a dict of
461        # fedid -> (proj, user)
462        try:
463            f = open(accessdb_file, "r")
464            for line in f:
465                lineno += 1
466                line = line.strip()
467                if len(line) == 0 or line.startswith('#'):
468                    continue
469                m = project_line.match(line)
470                if m:
471                    fid = fedid(hexstr=m.group(1))
472                    project, user = m.group(2,3)
473                    if not self.accessdb.has_key(fid):
474                        self.accessdb[fid] = []
475                    self.accessdb[fid].append((project, user))
476                    continue
477
478                m = user_line.match(line)
479                if m:
480                    fid = fedid(hexstr=m.group(1))
481                    project = None
482                    user = m.group(2)
483                    if not self.accessdb.has_key(fid):
484                        self.accessdb[fid] = []
485                    self.accessdb[fid].append((project, user))
486                    continue
487                self.log.warn("[experiment_control] Error parsing access " +\
488                        "db %s at line %d" %  (accessdb_file, lineno))
489        except IOError:
490            raise service_error(service_error.internal,
491                    "Error opening/reading %s as experiment " +\
492                            "control accessdb" %  accessdb_file)
493        f.close()
494
495        # Initialize the authorization attributes
496        for fid in self.accessdb.keys():
497            self.auth.set_attribute(fid, 'create')
498
499    def read_mapdb(self, file):
500        """
501        Read a simple colon separated list of mappings for the
502        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
503        """
504
505        self.tbmap = { }
506        lineno =0
507        try:
508            f = open(file, "r")
509            for line in f:
510                lineno += 1
511                line = line.strip()
512                if line.startswith('#') or len(line) == 0:
513                    continue
514                try:
515                    label, url = line.split(':', 1)
516                    self.tbmap[label] = url
517                except ValueError, e:
518                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
519                            "map db: %s %s" % (lineno, line, e))
520        except IOError, e:
521            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
522                    "open %s: %s" % (file, e))
523        f.close()
524       
525    def generate_ssh_keys(self, dest, type="rsa" ):
526        """
527        Generate a set of keys for the gateways to use to talk.
528
529        Keys are of type type and are stored in the required dest file.
530        """
531        valid_types = ("rsa", "dsa")
532        t = type.lower();
533        if t not in valid_types: raise ValueError
534        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
535
536        try:
537            trace = open("/dev/null", "w")
538        except IOError:
539            raise service_error(service_error.internal,
540                    "Cannot open /dev/null??");
541
542        # May raise CalledProcessError
543        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
544        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
545        if rv != 0:
546            raise service_error(service_error.internal, 
547                    "Cannot generate nonce ssh keys.  %s return code %d" \
548                            % (self.ssh_keygen, rv))
549
550    def gentopo(self, str):
551        """
552        Generate the topology dtat structure from the splitter's XML
553        representation of it.
554
555        The topology XML looks like:
556            <experiment>
557                <nodes>
558                    <node><vname></vname><ips>ip1:ip2</ips></node>
559                </nodes>
560                <lans>
561                    <lan>
562                        <vname></vname><vnode></vnode><ip></ip>
563                        <bandwidth></bandwidth><member>node:port</member>
564                    </lan>
565                </lans>
566        """
567        class topo_parse:
568            """
569            Parse the topology XML and create the dats structure.
570            """
571            def __init__(self):
572                # Typing of the subelements for data conversion
573                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
574                self.int_subelements = ( 'bandwidth',)
575                self.float_subelements = ( 'delay',)
576                # The final data structure
577                self.nodes = [ ]
578                self.lans =  [ ]
579                self.topo = { \
580                        'node': self.nodes,\
581                        'lan' : self.lans,\
582                    }
583                self.element = { }  # Current element being created
584                self.chars = ""     # Last text seen
585
586            def end_element(self, name):
587                # After each sub element the contents is added to the current
588                # element or to the appropriate list.
589                if name == 'node':
590                    self.nodes.append(self.element)
591                    self.element = { }
592                elif name == 'lan':
593                    self.lans.append(self.element)
594                    self.element = { }
595                elif name in self.str_subelements:
596                    self.element[name] = self.chars
597                    self.chars = ""
598                elif name in self.int_subelements:
599                    self.element[name] = int(self.chars)
600                    self.chars = ""
601                elif name in self.float_subelements:
602                    self.element[name] = float(self.chars)
603                    self.chars = ""
604
605            def found_chars(self, data):
606                self.chars += data.rstrip()
607
608
609        tp = topo_parse();
610        parser = xml.parsers.expat.ParserCreate()
611        parser.EndElementHandler = tp.end_element
612        parser.CharacterDataHandler = tp.found_chars
613
614        parser.Parse(str)
615
616        return tp.topo
617       
618
619    def genviz(self, topo):
620        """
621        Generate the visualization the virtual topology
622        """
623
624        neato = "/usr/local/bin/neato"
625        # These are used to parse neato output and to create the visualization
626        # file.
627        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
628        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
629                "%s</type></node>"
630
631        try:
632            # Node names
633            nodes = [ n['vname'] for n in topo['node'] ]
634            topo_lans = topo['lan']
635        except KeyError, e:
636            raise service_error(service_error.internal, "Bad topology: %s" %e)
637
638        lans = { }
639        links = { }
640
641        # Walk through the virtual topology, organizing the connections into
642        # 2-node connections (links) and more-than-2-node connections (lans).
643        # When a lan is created, it's added to the list of nodes (there's a
644        # node in the visualization for the lan).
645        for l in topo_lans:
646            if links.has_key(l['vname']):
647                if len(links[l['vname']]) < 2:
648                    links[l['vname']].append(l['vnode'])
649                else:
650                    nodes.append(l['vname'])
651                    lans[l['vname']] = links[l['vname']]
652                    del links[l['vname']]
653                    lans[l['vname']].append(l['vnode'])
654            elif lans.has_key(l['vname']):
655                lans[l['vname']].append(l['vnode'])
656            else:
657                links[l['vname']] = [ l['vnode'] ]
658
659
660        # Open up a temporary file for dot to turn into a visualization
661        try:
662            df, dotname = tempfile.mkstemp()
663            dotfile = os.fdopen(df, 'w')
664        except IOError:
665            raise service_error(service_error.internal,
666                    "Failed to open file in genviz")
667
668        try:
669            dnull = open('/dev/null', 'w')
670        except IOError:
671            service_error(service_error.internal,
672                    "Failed to open /dev/null in genviz")
673
674        # Generate a dot/neato input file from the links, nodes and lans
675        try:
676            print >>dotfile, "graph G {"
677            for n in nodes:
678                print >>dotfile, '\t"%s"' % n
679            for l in links.keys():
680                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
681            for l in lans.keys():
682                for n in lans[l]:
683                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
684            print >>dotfile, "}"
685            dotfile.close()
686        except TypeError:
687            raise service_error(service_error.internal,
688                    "Single endpoint link in vtopo")
689        except IOError:
690            raise service_error(service_error.internal, "Cannot write dot file")
691
692        # Use dot to create a visualization
693        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
694                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
695                close_fds=True)
696        dnull.close()
697
698        # Translate dot to vis format
699        vis_nodes = [ ]
700        vis = { 'node': vis_nodes }
701        for line in dot.stdout:
702            m = vis_re.match(line)
703            if m:
704                vn = m.group(1)
705                vis_node = {'name': vn, \
706                        'x': float(m.group(2)),\
707                        'y' : float(m.group(3)),\
708                    }
709                if vn in links.keys() or vn in lans.keys():
710                    vis_node['type'] = 'lan'
711                else:
712                    vis_node['type'] = 'node'
713                vis_nodes.append(vis_node)
714        rv = dot.wait()
715
716        os.remove(dotname)
717        if rv == 0 : return vis
718        else: return None
719
720    def get_access(self, tb, nodes, user, tbparam, master, export_project,
721            access_user):
722        """
723        Get access to testbed through fedd and set the parameters for that tb
724        """
725        uri = self.tbmap.get(tb, None)
726        if not uri:
727            raise service_error(serice_error.server_config, 
728                    "Unknown testbed: %s" % tb)
729
730        # currently this lumps all users into one service access group
731        service_keys = [ a for u in user \
732                for a in u.get('access', []) \
733                    if a.has_key('sshPubkey')]
734
735        if len(service_keys) == 0:
736            raise service_error(service_error.req, 
737                    "Must have at least one SSH pubkey for services")
738
739
740        for p, u in access_user:
741            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
742                    "to %s") %  ((p or "None"), u, uri))
743
744            if p:
745                # Request with user and project specified
746                req = {\
747                        'destinationTestbed' : { 'uri' : uri },
748                        'project': { 
749                            'name': {'localname': p},
750                            'user': [ {'userID': { 'localname': u } } ],
751                            },
752                        'user':  user,
753                        'allocID' : { 'localname': 'test' },
754                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
755                        'serviceAccess' : service_keys
756                    }
757            else:
758                # Request with only user specified
759                req = {\
760                        'destinationTestbed' : { 'uri' : uri },
761                        'user':  [ {'userID': { 'localname': u } } ],
762                        'allocID' : { 'localname': 'test' },
763                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
764                        'serviceAccess' : service_keys
765                    }
766
767            if tb == master:
768                # NB, the export_project parameter is a dict that includes
769                # the type
770                req['exportProject'] = export_project
771
772            # node resources if any
773            if nodes != None and len(nodes) > 0:
774                rnodes = [ ]
775                for n in nodes:
776                    rn = { }
777                    image, hw, count = n.split(":")
778                    if image: rn['image'] = [ image ]
779                    if hw: rn['hardware'] = [ hw ]
780                    if count and int(count) >0 : rn['count'] = int(count)
781                    rnodes.append(rn)
782                req['resources']= { }
783                req['resources']['node'] = rnodes
784
785            try:
786                if self.local_access.has_key(uri):
787                    # Local access call
788                    req = { 'RequestAccessRequestBody' : req }
789                    r = self.local_access[uri].RequestAccess(req, 
790                            fedid(file=self.cert_file))
791                    r = { 'RequestAccessResponseBody' : r }
792                else:
793                    r = self.call_RequestAccess(uri, req, 
794                            self.cert_file, self.cert_pwd, self.trusted_certs)
795            except service_error, e:
796                if e.code == service_error.access:
797                    self.log.debug("[get_access] Access denied")
798                    r = None
799                    continue
800                else:
801                    raise e
802
803            if r.has_key('RequestAccessResponseBody'):
804                # Through to here we have a valid response, not a fault.
805                # Access denied is a fault, so something better or worse than
806                # access denied has happened.
807                r = r['RequestAccessResponseBody']
808                self.log.debug("[get_access] Access granted")
809                break
810            else:
811                raise service_error(service_error.protocol,
812                        "Bad proxy response")
813       
814        if not r:
815            raise service_error(service_error.access, 
816                    "Access denied by %s (%s)" % (tb, uri))
817
818        e = r['emulab']
819        p = e['project']
820        tbparam[tb] = { 
821                "boss": e['boss'],
822                "host": e['ops'],
823                "domain": e['domain'],
824                "fs": e['fileServer'],
825                "eventserver": e['eventServer'],
826                "project": unpack_id(p['name']),
827                "emulab" : e,
828                "allocID" : r['allocID'],
829                }
830        # Make the testbed name be the label the user applied
831        p['testbed'] = {'localname': tb }
832
833        for u in p['user']:
834            role = u.get('role', None)
835            if role == 'experimentCreation':
836                tbparam[tb]['user'] = unpack_id(u['userID'])
837                break
838        else:
839            raise service_error(service_error.internal, 
840                    "No createExperimentUser from %s" %tb)
841
842        # Add attributes to barameter space.  We don't allow attributes to
843        # overlay any parameters already installed.
844        for a in e['fedAttr']:
845            try:
846                if a['attribute'] and isinstance(a['attribute'], basestring)\
847                        and not tbparam[tb].has_key(a['attribute'].lower()):
848                    tbparam[tb][a['attribute'].lower()] = a['value']
849            except KeyError:
850                self.log.error("Bad attribute in response: %s" % a)
851       
852    def release_access(self, tb, aid):
853        """
854        Release access to testbed through fedd
855        """
856
857        uri = self.tbmap.get(tb, None)
858        if not uri:
859            raise service_error(serice_error.server_config, 
860                    "Unknown testbed: %s" % tb)
861
862        if self.local_access.has_key(uri):
863            resp = self.local_access[uri].ReleaseAccess(\
864                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
865                    fedid(file=self.cert_file))
866            resp = { 'ReleaseAccessResponseBody': resp } 
867        else:
868            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
869                    self.cert_file, self.cert_pwd, self.trusted_certs)
870
871        # better error coding
872
873    def remote_splitter(self, uri, desc, master):
874
875        req = {
876                'description' : { 'ns2description': desc },
877                'master': master,
878                'include_fedkit': bool(self.fedkit),
879                'include_gatewaykit': bool(self.gatewaykit)
880            }
881
882        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
883                self.trusted_certs)
884
885        if r.has_key('Ns2SplitResponseBody'):
886            r = r['Ns2SplitResponseBody']
887            if r.has_key('output'):
888                return r['output'].splitlines()
889            else:
890                raise service_error(service_error.protocol, 
891                        "Bad splitter response (no output)")
892        else:
893            raise service_error(service_error.protocol, "Bad splitter response")
894
895    class start_segment:
896        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
897                cert_pwd=None, trusted_certs=None, caller=None,
898                log_collector=None):
899            self.log = log
900            self.debug = debug
901            self.cert_file = cert_file
902            self.cert_pwd = cert_pwd
903            self.trusted_certs = None
904            self.caller = caller
905            self.testbed = testbed
906            self.log_collector = log_collector
907
908        def __call__(self, uri, aid, topo, master, attrs=None):
909            req = {
910                    'allocID': { 'fedid' : aid }, 
911                    'segmentdescription': { 
912                        'topdldescription': topo.to_dict(),
913                    },
914                    'master': master,
915                }
916            if attrs:
917                req['fedAttr'] = attrs
918
919            try:
920                self.log.debug("Calling StartSegment at %s " % uri)
921                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
922                        self.trusted_certs)
923                if r.has_key('StartSegmentResponseBody'):
924                    lval = r['StartSegmentResponseBody'].get('allocationLog',
925                            None)
926                    if lval and self.log_collector:
927                        for line in  lval.splitlines(True):
928                            self.log_collector.write(line)
929                else:
930                    raise service_error(service_error.internal, 
931                            "Bad response!?: %s" %r)
932                return True
933            except service_error, e:
934                self.log.error("Start segment failed on %s: %s" % \
935                        (self.testbed, e))
936                return False
937
938
939
940    class terminate_segment:
941        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
942                cert_pwd=None, trusted_certs=None, caller=None):
943            self.log = log
944            self.debug = debug
945            self.cert_file = cert_file
946            self.cert_pwd = cert_pwd
947            self.trusted_certs = None
948            self.caller = caller
949            self.testbed = testbed
950
951        def __call__(self, uri, aid ):
952            req = {
953                    'allocID': aid , 
954                }
955            try:
956                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
957                        self.trusted_certs)
958                return True
959            except service_error, e:
960                self.log.error("Terminate segment failed on %s: %s" % \
961                        (self.testbed, e))
962                return False
963   
964
965    def allocate_resources(self, allocated, master, eid, expid, expcert, 
966            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
967            attrs=None):
968        started = { }           # Testbeds where a sub-experiment started
969                                # successfully
970
971        # XXX
972        fail_soft = False
973
974        log = alloc_log or self.log
975
976        thread_pool = self.thread_pool(self.nthreads)
977        threads = [ ]
978
979        for tb in [ k for k in allocated.keys() if k != master]:
980            # Create and start a thread to start the segment, and save it to
981            # get the return value later
982            thread_pool.wait_for_slot()
983            uri = self.tbmap.get(tb, None)
984            if not uri:
985                raise service_error(service_error.internal, 
986                        "Unknown testbed %s !?" % tb)
987
988            if tbparams[tb].has_key('allocID') and \
989                    tbparams[tb]['allocID'].has_key('fedid'):
990                aid = tbparams[tb]['allocID']['fedid']
991            else:
992                raise service_error(service_error.internal, 
993                        "No alloc id for testbed %s !?" % tb)
994
995            t  = self.pooled_thread(\
996                    target=self.start_segment(log=log, debug=self.debug,
997                        testbed=tb, cert_file=self.cert_file,
998                        cert_pwd=self.cert_pwd,
999                        trusted_certs=self.trusted_certs,
1000                        caller=self.call_StartSegment,
1001                        log_collector=log_collector), 
1002                    args=(uri, aid, topo[tb], False, attrs), name=tb,
1003                    pdata=thread_pool, trace_file=self.trace_file)
1004            threads.append(t)
1005            t.start()
1006
1007        # Wait until all finish (keep pinging the log, though)
1008        mins = 0
1009        while not thread_pool.wait_for_all_done(60.0):
1010            mins += 1
1011            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1012                    % mins)
1013
1014        thread_pool.clear()
1015
1016        # If none failed, start the master
1017        failed = [ t.getName() for t in threads if not t.rv ]
1018
1019        if len(failed) == 0:
1020            uri = self.tbmap.get(master, None)
1021            if not uri:
1022                raise service_error(service_error.internal, 
1023                        "Unknown testbed %s !?" % master)
1024
1025            if tbparams[master].has_key('allocID') and \
1026                    tbparams[master]['allocID'].has_key('fedid'):
1027                aid = tbparams[master]['allocID']['fedid']
1028            else:
1029                raise service_error(service_error.internal, 
1030                    "No alloc id for testbed %s !?" % master)
1031            t = self.pooled_thread(
1032                    target=self.start_segment(log=log, debug=self.debug, 
1033                        testbed=master, cert_file=self.cert_file, 
1034                        cert_pwd=self.cert_pwd, 
1035                        trusted_certs=self.trusted_certs,
1036                        caller=self.call_StartSegment,
1037                        log_collector=log_collector),
1038                    args =(uri, aid, topo[master], True, attrs),
1039                    name=master, pdata=thread_pool, trace_file=self.trace_file)
1040            threads.append(t)
1041            t.start()
1042            # Wait until the master finishes (keep pinging the log, though)
1043            mins = 0
1044            while not thread_pool.wait_for_all_done(60.0):
1045                mins += 1
1046                alloc_log.info("Waiting for master (it has been %d mins)" \
1047                        % mins)
1048            # update failed to include the master, if it failed
1049            failed = [ t.getName() for t in threads if not t.rv ]
1050
1051        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1052        # If one failed clean up, unless fail_soft is set
1053        if failed:
1054            if not fail_soft:
1055                thread_pool.clear()
1056                for tb in succeeded:
1057                    # Create and start a thread to stop the segment
1058                    thread_pool.wait_for_slot()
1059                    uri = self.tbmap.get(tb, None)
1060                    t  = self.pooled_thread(\
1061                            target=self.terminate_segment(log=log,
1062                                testbed=tb,
1063                                cert_file=self.cert_file, 
1064                                cert_pwd=self.cert_pwd,
1065                                trusted_certs=self.trusted_certs,
1066                                caller=self.call_TerminateSegment),
1067                            args=(uri, tbparams[tb]['federant']['allocID']),
1068                            name=tb,
1069                            pdata=thread_pool, trace_file=self.trace_file)
1070                    t.start()
1071                # Wait until all finish
1072                thread_pool.wait_for_all_done()
1073
1074                # release the allocations
1075                for tb in tbparams.keys():
1076                    self.release_access(tb, tbparams[tb]['allocID'])
1077                # Remove the placeholder
1078                self.state_lock.acquire()
1079                self.state[eid]['experimentStatus'] = 'failed'
1080                if self.state_filename: self.write_state()
1081                self.state_lock.release()
1082
1083                log.error("Swap in failed on %s" % ",".join(failed))
1084                return
1085        else:
1086            log.info("[start_segment]: Experiment %s active" % eid)
1087
1088        log.debug("[start_experiment]: removing %s" % tmpdir)
1089
1090        # Walk up tmpdir, deleting as we go
1091        for path, dirs, files in os.walk(tmpdir, topdown=False):
1092            for f in files:
1093                os.remove(os.path.join(path, f))
1094            for d in dirs:
1095                os.rmdir(os.path.join(path, d))
1096        os.rmdir(tmpdir)
1097
1098        # Insert the experiment into our state and update the disk copy
1099        self.state_lock.acquire()
1100        self.state[expid]['experimentStatus'] = 'active'
1101        self.state[eid] = self.state[expid]
1102        if self.state_filename: self.write_state()
1103        self.state_lock.release()
1104        return
1105
1106
1107    def add_kit(self, e, kit):
1108        """
1109        Add a Software object created from the list of (install, location)
1110        tuples passed as kit  to the software attribute of an object e.  We
1111        do this enough to break out the code, but it's kind of a hack to
1112        avoid changing the old tuple rep.
1113        """
1114
1115        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1116
1117        if isinstance(e.software, list): e.software.extend(s)
1118        else: e.software = s
1119
1120
1121    def create_experiment_state(self, fid, req, expid, expcert):
1122        """
1123        Create the initial entry in the experiment's state.  The expid and
1124        expcert are the experiment's fedid and certifacte that represents that
1125        ID, which are installed in the experiment state.  If the request
1126        includes a suggested local name that is used if possible.  If the local
1127        name is already taken by an experiment owned by this user that has
1128        failed, it is overwriutten.  Otherwise new letters are added until a
1129        valid localname is found.  The generated local name is returned.
1130        """
1131
1132        if req.has_key('experimentID') and \
1133                req['experimentID'].has_key('localname'):
1134            overwrite = False
1135            eid = req['experimentID']['localname']
1136            # If there's an old failed experiment here with the same local name
1137            # and accessible by this user, we'll overwrite it, otherwise we'll
1138            # fall through and do the collision avoidance.
1139            old_expid = self.get_experiment_fedid(eid)
1140            if old_expid and self.check_experiment_access(fid, old_expid):
1141                self.state_lock.acquire()
1142                status = self.state[eid].get('experimentStatus', None)
1143                if status and status == 'failed':
1144                    # remove the old access attribute
1145                    self.auth.unset_attribute(fid, old_expid)
1146                    overwrite = True
1147                    del self.state[eid]
1148                    del self.state[old_expid]
1149                self.state_lock.release()
1150            self.state_lock.acquire()
1151            while (self.state.has_key(eid) and not overwrite):
1152                eid += random.choice(string.ascii_letters)
1153            # Initial state
1154            self.state[eid] = {
1155                    'experimentID' : \
1156                            [ { 'localname' : eid }, {'fedid': expid } ],
1157                    'experimentStatus': 'starting',
1158                    'experimentAccess': { 'X509' : expcert },
1159                    'owner': fid,
1160                    'log' : [],
1161                }
1162            self.state[expid] = self.state[eid]
1163            if self.state_filename: self.write_state()
1164            self.state_lock.release()
1165        else:
1166            eid = self.exp_stem
1167            for i in range(0,5):
1168                eid += random.choice(string.ascii_letters)
1169            self.state_lock.acquire()
1170            while (self.state.has_key(eid)):
1171                eid = self.exp_stem
1172                for i in range(0,5):
1173                    eid += random.choice(string.ascii_letters)
1174            # Initial state
1175            self.state[eid] = {
1176                    'experimentID' : \
1177                            [ { 'localname' : eid }, {'fedid': expid } ],
1178                    'experimentStatus': 'starting',
1179                    'experimentAccess': { 'X509' : expcert },
1180                    'owner': fid,
1181                    'log' : [],
1182                }
1183            self.state[expid] = self.state[eid]
1184            if self.state_filename: self.write_state()
1185            self.state_lock.release()
1186
1187        return eid
1188
1189
1190    def allocate_ips_to_topo(self, top):
1191        """
1192        Add an ip4_address attribute to all the hosts in teh topology, based on
1193        the shared substrates on which they sit.  An /etc/hosts file is also
1194        created and returned as a list of hostfiles entries.
1195        """
1196        subs = sorted(top.substrates, 
1197                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1198                reverse=True)
1199        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1200        ifs = { }
1201        hosts = [ ]
1202
1203        for idx, s in enumerate(subs):
1204            a = ips.allocate(len(s.interfaces)+2)
1205            if a :
1206                base, num = a
1207                if num < len(s.interfaces) +2 : 
1208                    raise service_error(service_error.internal,
1209                            "Allocator returned wrong number of IPs??")
1210            else:
1211                raise service_error(service_error.req, 
1212                        "Cannot allocate IP addresses")
1213
1214            base += 1
1215            for i in s.interfaces:
1216                i.attribute.append(
1217                        topdl.Attribute('ip4_address', 
1218                            "%s" % ip_addr(base)))
1219                hname = i.element.name[0]
1220                if ifs.has_key(hname):
1221                    hosts.append("%s\t%s-%s %s-%d" % \
1222                            (ip_addr(base), hname, s.name, hname,
1223                                ifs[hname]))
1224                else:
1225                    ifs[hname] = 0
1226                    hosts.append("%s\t%s-%s %s-%d %s" % \
1227                            (ip_addr(base), hname, s.name, hname,
1228                                ifs[hname], hname))
1229
1230                ifs[hname] += 1
1231                base += 1
1232        return hosts
1233
1234    def get_access_to_testbeds(self, testbeds, user, access_user, 
1235            export_project, master, allocated, tbparams):
1236        """
1237        Request access to the various testbeds required for this instantiation
1238        (passed in as testbeds).  User, access_user, expoert_project and master
1239        are used to construct the correct requests.  Per-testbed parameters are
1240        returned in tbparams.
1241        """
1242        for tb in testbeds:
1243            self.get_access(tb, None, user, tbparams, master,
1244                    export_project, access_user)
1245            allocated[tb] = 1
1246
1247    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1248        """
1249        Create the sub-topologies that are needed for experimetn instantiation.
1250        Along the way attach startup commands to the computers in the
1251        subtopologies.
1252        """
1253        for tb in testbeds:
1254            topo[tb] = top.clone()
1255            to_delete = [ ]
1256            for e in topo[tb].elements:
1257                etb = e.get_attribute('testbed')
1258                if etb and etb != tb:
1259                    for i in e.interface:
1260                        for s in i.subs:
1261                            try:
1262                                s.interfaces.remove(i)
1263                            except ValueError:
1264                                raise service_error(service_error.internal,
1265                                        "Can't remove interface??")
1266                    to_delete.append(e)
1267            for e in to_delete:
1268                topo[tb].elements.remove(e)
1269            topo[tb].make_indices()
1270
1271            for e in [ e for e in topo[tb].elements \
1272                    if isinstance(e,topdl.Computer)]:
1273                if tb == master:
1274                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1275                else:
1276                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1277                scmd = e.get_attribute('startup')
1278                if scmd:
1279                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1280
1281                e.set_attribute('startup', cmd)
1282                if self.fedkit: self.add_kit(e, self.fedkit)
1283
1284    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
1285            portal_type, portal_subst, subst=None, addr=None):
1286        sproject = tbparams[st].get('project', 'project')
1287        dproject = tbparams[dt].get('project', 'project')
1288        mproject = tbparams[master].get('project', 'project')
1289        sdomain = tbparams[st].get('domain', ".example.com")
1290        ddomain = tbparams[dt].get('domain', ".example.com")
1291        mdomain = tbparams[master].get('domain', '.example.com')
1292        muser = tbparams[master].get('user', 'root')
1293        smbshare = tbparams[master].get('smbshare', 'USERS')
1294        aid = tbparams[dt]['allocID']['fedid']
1295        if st == master or dt == master:
1296            active = ("%s" % (st == master))
1297        else:
1298            active = ("%s" %(st > dt))
1299
1300        ifaces = [
1301            topdl.Interface(
1302                substrate=portal_subst,
1303                attribute=[
1304                    topdl.Attribute(attribute='portal',
1305                        value='true')
1306                    ]
1307                ),
1308            ]
1309        if subst and addr:
1310            ifaces.append(
1311                    topdl.Interface(
1312                        substrate=subst,
1313                        attribute=[ 
1314                            topdl.Attribute(
1315                                attribute='ip4_address', 
1316                                value=addr,
1317                            )
1318                        ]
1319                        ))
1320
1321
1322        return topdl.Computer(
1323                name=myname,
1324                attribute=[ 
1325                    topdl.Attribute(attribute=n,value=v)
1326                        for n, v in (\
1327                            ('portal', 'true'),
1328                            ('domain', sdomain),
1329                            ('masterdomain', mdomain),
1330                            ('masterexperiment', "%s/%s" % \
1331                                    (mproject, eid)),
1332                            ('masteruser', muser),
1333                            ('smbshare', smbshare),
1334                            ('experiment', "%s/%s" % \
1335                                    (sproject, eid)),
1336                            ('peer', "%s" % desthost),
1337                            ('peer_segment', "%s" % aid),
1338                            ('scriptdir', 
1339                                "/usr/local/federation/bin"),
1340                            ('active', "%s" % active),
1341                            ('portal_type', portal_type), 
1342                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), eid.lower(), sproject.lower(), sdomain.lower())))
1343                    ],
1344                interface=ifaces,
1345                )
1346
1347    def new_portal_substrate(self, st, dt, eid, tbparams):
1348        ddomain = tbparams[dt].get('domain', ".example.com")
1349        dproject = tbparams[dt].get('project', 'project')
1350        tsubstrate = \
1351                topdl.Substrate(name='%s-%s' % (st, dt),
1352                        attribute= [
1353                            topdl.Attribute(
1354                                attribute='portal',
1355                                value='true')
1356                            ]
1357                        )
1358        segment_element = topdl.Segment(
1359                id= tbparams[dt]['allocID'],
1360                type='emulab',
1361                uri = self.tbmap.get(dt, None),
1362                interface=[ 
1363                    topdl.Interface(
1364                        substrate=tsubstrate.name),
1365                    ],
1366                attribute = [
1367                    topdl.Attribute(attribute=n, value=v)
1368                        for n, v in (\
1369                            ('domain', ddomain),
1370                            ('experiment', "%s/%s" % \
1371                                    (dproject, eid)),)
1372                    ],
1373                )
1374
1375        return (tsubstrate, segment_element)
1376
1377    def add_portals(self, top, topo, eid, master, tbparams):
1378        """
1379        For each substrate in the main topology, find those that
1380        have nodes on more than one testbed.  Insert portal nodes
1381        into the copies of those substrates on the sub topologies.
1382        """
1383        segment_substrate = { }
1384        portals = { }
1385        for s in top.substrates:
1386            # tbs will contain an ip address on this subsrate that is in
1387            # each testbed.
1388            tbs = { }
1389            for i in s.interfaces:
1390                e = i.element
1391                tb = e.get_attribute('testbed')
1392                if tb and not tbs.has_key(tb):
1393                    for i in e.interface:
1394                        if s in i.subs:
1395                            tbs[tb]= i.get_attribute('ip4_address')
1396            if len(tbs) < 2:
1397                continue
1398
1399            # More than one testbed is on this substrate.  Insert
1400            # some portals into the subtopologies.  st == source testbed,
1401            # dt == destination testbed.
1402            for st in tbs.keys():
1403                if not segment_substrate.has_key(st):
1404                    segment_substrate[st] = { }
1405                if not portals.has_key(st): 
1406                    portals[st] = { }
1407                for dt in [ t for t in tbs.keys() if t != st]:
1408                    sproject = tbparams[st].get('project', 'project')
1409                    dproject = tbparams[dt].get('project', 'project')
1410                    mproject = tbparams[master].get('project', 'project')
1411                    sdomain = tbparams[st].get('domain', ".example.com")
1412                    ddomain = tbparams[dt].get('domain', ".example.com")
1413                    mdomain = tbparams[master].get('domain', '.example.com')
1414                    muser = tbparams[master].get('user', 'root')
1415                    smbshare = tbparams[master].get('smbshare', 'USERS')
1416                    aid = tbparams[dt]['allocID']['fedid']
1417                    if st == master or dt == master:
1418                        active = ("%s" % (st == master))
1419                    else:
1420                        active = ("%s" %(st > dt))
1421                    if not segment_substrate[st].has_key(dt):
1422                        # Put a substrate and a segment for the connected
1423                        # testbed in there.
1424                        tsubstrate, segment_element = \
1425                                self.new_portal_substrate(st, dt, eid, tbparams)
1426                        segment_substrate[st][dt] = tsubstrate
1427                        topo[st].substrates.append(tsubstrate)
1428                        topo[st].elements.append(segment_element)
1429
1430                    new_portal = False
1431                    if portals[st].has_key(dt):
1432                        # There's a portal set up to go to this destination.
1433                        # See if there's room to multiples this connection on
1434                        # it.  If so, add an interface to the portal; if not,
1435                        # set up to add a portal below.
1436                        # [This little festival of braces is just a pop of the
1437                        # last element in the list of portals between st and
1438                        # dt.]
1439                        portal = portals[st][dt][-1]
1440                        mux = len([ i for i in portal.interface \
1441                                if not i.get_attribute('portal')])
1442                        if mux == self.muxmax:
1443                            new_portal = True
1444                            portal_type = "experiment"
1445                            myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1446                            desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1447                        else:
1448                            new_i = topdl.Interface(
1449                                    substrate=s.name,
1450                                    attribute=[ 
1451                                        topdl.Attribute(
1452                                            attribute='ip4_address', 
1453                                            value=tbs[dt]
1454                                        )
1455                                    ])
1456                            portal.interface.append(new_i)
1457                    else:
1458                        # First connection to this testbed, make an empty list
1459                        # and set up to add the new portal below
1460                        new_portal = True
1461                        portals[st][dt] = [ ]
1462                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
1463                        desthost = "%stunnel%d" % (st, len(portals[st][dt]))
1464
1465                        if dt == master or st == master: portal_type = "both"
1466                        else: portal_type = "experiment"
1467
1468                    if new_portal:
1469                        portal =  self.new_portal_node(st, dt, tbparams, 
1470                                master, eid, myname, desthost, portal_type,
1471                                segment_substrate[st][dt].name, s.name, tbs[dt])
1472                        if self.fedkit:
1473                            self.add_kit(portal, self.fedkit)
1474                        if self.gatewaykit: 
1475                            self.add_kit(portal, self.gatewaykit)
1476
1477                        topo[st].elements.append(portal)
1478                        portals[st][dt].append(portal)
1479
1480        # Make sure that all the slaves have a control portal back to the
1481        # master.
1482        for tb in [ t for t in tbparams.keys() if t != master ]:
1483            if len([e for e in topo[tb].elements \
1484                    if isinstance(e, topdl.Computer) and \
1485                    e.get_attribute('portal') and \
1486                    e.get_attribute('portal_type') == 'both']) == 0:
1487
1488                # Add to the master testbed
1489                tsubstrate, segment_element = \
1490                        self.new_portal_substrate(master, tb, eid, tbparams)
1491                myname = "%stunnel" % tb
1492                desthost = "%stunnel" % master
1493
1494                portal = self.new_portal_node(master, tb, tbparams, master,
1495                        eid, myname, desthost, "control", tsubstrate.name)
1496                if self.fedkit:
1497                    self.add_kit(portal, self.fedkit)
1498                if self.gatewaykit: 
1499                    self.add_kit(portal, self.gatewaykit)
1500
1501                topo[master].substrates.append(tsubstrate)
1502                topo[master].elements.append(segment_element)
1503                topo[master].elements.append(portal)
1504
1505                # And to the other testbed
1506
1507                tsubstrate, segment_element = \
1508                        self.new_portal_substrate(tb, master, eid, tbparams)
1509                myname = "%stunnel" % master
1510                desthost = "%stunnel" % tb
1511
1512                portal = self.new_portal_node(tb, master, tbparams, master,
1513                        eid, myname, desthost, "control", tsubstrate.name)
1514                if self.fedkit:
1515                    self.add_kit(portal, self.fedkit)
1516                if self.gatewaykit: 
1517                    self.add_kit(portal, self.gatewaykit)
1518
1519                topo[tb].substrates.append(tsubstrate)
1520                topo[tb].elements.append(segment_element)
1521                topo[tb].elements.append(portal)
1522
1523
1524        # Connect the portal nodes into the topologies and clear out
1525        # substrates that are not in the topologies
1526        for tb in tbparams.keys():
1527            topo[tb].incorporate_elements()
1528            topo[tb].substrates = \
1529                    [s for s in topo[tb].substrates \
1530                        if len(s.interfaces) >0]
1531
1532    def wrangle_software(self, expid, top, topo, tbparams):
1533        """
1534        Copy software out to the repository directory, allocate permissions and
1535        rewrite the segment topologies to look for the software in local
1536        places.
1537        """
1538
1539        # Copy the rpms and tarfiles to a distribution directory from
1540        # which the federants can retrieve them
1541        linkpath = "%s/software" %  expid
1542        softdir ="%s/%s" % ( self.repodir, linkpath)
1543        softmap = { }
1544        # These are in a list of tuples format (each kit).  This comprehension
1545        # unwraps them into a single list of tuples that initilaizes the set of
1546        # tuples.
1547        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1548                for p, t in l ])
1549        pkgs.update([x.location for e in top.elements \
1550                for x in e.software])
1551        try:
1552            os.makedirs(softdir)
1553        except IOError, e:
1554            raise service_error(
1555                    "Cannot create software directory: %s" % e)
1556        # The actual copying.  Everything's converted into a url for copying.
1557        for pkg in pkgs:
1558            loc = pkg
1559
1560            scheme, host, path = urlparse(loc)[0:3]
1561            dest = os.path.basename(path)
1562            if not scheme:
1563                if not loc.startswith('/'):
1564                    loc = "/%s" % loc
1565                loc = "file://%s" %loc
1566            try:
1567                u = urlopen(loc)
1568            except Exception, e:
1569                raise service_error(service_error.req, 
1570                        "Cannot open %s: %s" % (loc, e))
1571            try:
1572                f = open("%s/%s" % (softdir, dest) , "w")
1573                self.log.debug("Writing %s/%s" % (softdir,dest) )
1574                data = u.read(4096)
1575                while data:
1576                    f.write(data)
1577                    data = u.read(4096)
1578                f.close()
1579                u.close()
1580            except Exception, e:
1581                raise service_error(service_error.internal,
1582                        "Could not copy %s: %s" % (loc, e))
1583            path = re.sub("/tmp", "", linkpath)
1584            # XXX
1585            softmap[pkg] = \
1586                    "https://users.isi.deterlab.net:23232/%s/%s" %\
1587                    ( path, dest)
1588
1589            # Allow the individual segments to access the software.
1590            for tb in tbparams.keys():
1591                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1592                        "/%s/%s" % ( path, dest))
1593
1594        # Convert the software locations in the segments into the local
1595        # copies on this host
1596        for soft in [ s for tb in topo.values() \
1597                for e in tb.elements \
1598                    if getattr(e, 'software', False) \
1599                        for s in e.software ]:
1600            if softmap.has_key(soft.location):
1601                soft.location = softmap[soft.location]
1602
1603
1604    def create_experiment(self, req, fid):
1605        """
1606        The external interface to experiment creation called from the
1607        dispatcher.
1608
1609        Creates a working directory, splits the incoming description using the
1610        splitter script and parses out the avrious subsections using the
1611        lcasses above.  Once each sub-experiment is created, use pooled threads
1612        to instantiate them and start it all up.
1613        """
1614        if not self.auth.check_attribute(fid, 'create'):
1615            raise service_error(service_error.access, "Create access denied")
1616
1617        try:
1618            tmpdir = tempfile.mkdtemp(prefix="split-")
1619            os.mkdir(tmpdir+"/keys")
1620        except IOError:
1621            raise service_error(service_error.internal, "Cannot create tmp dir")
1622
1623        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1624        gw_secretkey_base = "fed.%s" % self.ssh_type
1625        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1626        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1627        tclfile = tmpdir + "/experiment.tcl"
1628        tbparams = { }
1629        try:
1630            access_user = self.accessdb[fid]
1631        except KeyError:
1632            raise service_error(service_error.internal,
1633                    "Access map and authorizer out of sync in " + \
1634                            "create_experiment for fedid %s"  % fid)
1635
1636        pid = "dummy"
1637        gid = "dummy"
1638
1639        req = req.get('CreateRequestBody', None)
1640        if not req:
1641            raise service_error(service_error.req,
1642                    "Bad request format (no CreateRequestBody)")
1643        # The tcl parser needs to read a file so put the content into that file
1644        descr=req.get('experimentdescription', None)
1645        if descr:
1646            file_content=descr.get('ns2description', None)
1647            if file_content:
1648                try:
1649                    f = open(tclfile, 'w')
1650                    f.write(file_content)
1651                    f.close()
1652                except IOError:
1653                    raise service_error(service_error.internal,
1654                            "Cannot write temp experiment description")
1655            else:
1656                raise service_error(service_error.req, 
1657                        "Only ns2descriptions supported")
1658        else:
1659            raise service_error(service_error.req, "No experiment description")
1660
1661        # Generate an ID for the experiment (slice) and a certificate that the
1662        # allocator can use to prove they own it.  We'll ship it back through
1663        # the encrypted connection.
1664        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1665
1666        eid = self.create_experiment_state(fid, req, expid, expcert)
1667        try: 
1668            # This catches exceptions to clear the placeholder if necessary
1669            try:
1670                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1671            except ValueError:
1672                raise service_error(service_error.server_config, 
1673                        "Bad key type (%s)" % self.ssh_type)
1674
1675            user = req.get('user', None)
1676            if user == None:
1677                raise service_error(service_error.req, "No user")
1678
1679            master = req.get('master', None)
1680            if not master:
1681                raise service_error(service_error.req,
1682                        "No master testbed label")
1683            export_project = req.get('exportProject', None)
1684            if not export_project:
1685                raise service_error(service_error.req, "No export project")
1686           
1687            # Translate to topdl
1688            if self.splitter_url:
1689                # XXX: need remote topdl translator
1690                self.log.debug("Calling remote splitter at %s" % \
1691                        self.splitter_url)
1692                split_data = self.remote_splitter(self.splitter_url,
1693                        file_content, master)
1694            else:
1695                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1696                    str(self.muxmax), '-m', master]
1697
1698                if self.fedkit:
1699                    tclcmd.append('-k')
1700
1701                if self.gatewaykit:
1702                    tclcmd.append('-K')
1703
1704                tclcmd.extend([pid, gid, eid, tclfile])
1705
1706                self.log.debug("running local splitter %s", " ".join(tclcmd))
1707                # This is just fantastic.  As a side effect the parser copies
1708                # tb_compat.tcl into the current directory, so that directory
1709                # must be writable by the fedd user.  Doing this in the
1710                # temporary subdir ensures this is the case.
1711                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1712                        cwd=tmpdir)
1713                split_data = tclparser.stdout
1714
1715            top = topdl.topology_from_xml(file=split_data, top="experiment")
1716
1717            hosts = self.allocate_ips_to_topo(top)
1718             # Find the testbeds to look up
1719            testbeds = set([ a.value for e in top.elements \
1720                    for a in e.attribute \
1721                    if a.attribute == 'testbed'] )
1722
1723            allocated = { }         # Testbeds we can access
1724            topo ={ }               # Sub topologies
1725            self.get_access_to_testbeds(testbeds, user, access_user, 
1726                    export_project, master, allocated, tbparams)
1727            self.split_topology(top, topo, testbeds, eid, master, tbparams)
1728
1729            # Copy configuration files into the remote file store
1730            # The config urlpath
1731            configpath = "/%s/config" % expid
1732            # The config file system location
1733            configdir ="%s%s" % ( self.repodir, configpath)
1734            try:
1735                os.makedirs(configdir)
1736            except IOError, e:
1737                raise service_error(
1738                        "Cannot create config directory: %s" % e)
1739            try:
1740                f = open("%s/hosts" % configdir, "w")
1741                f.write('\n'.join(hosts))
1742                f.close()
1743            except IOError, e:
1744                raise service_error(service_error.internal, 
1745                        "Cannot write hosts file: %s" % e)
1746            try:
1747                copy_file("%s" % gw_pubkey, "%s/%s" % \
1748                        (configdir, gw_pubkey_base))
1749                copy_file("%s" % gw_secretkey, "%s/%s" % \
1750                        (configdir, gw_secretkey_base))
1751            except IOError, e:
1752                raise service_error(service_error.internal, 
1753                        "Cannot copy keyfiles: %s" % e)
1754
1755            # Allow the individual testbeds to access the configuration files.
1756            for tb in tbparams.keys():
1757                asignee = tbparams[tb]['allocID']['fedid']
1758                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1759                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1760
1761            self.add_portals(top, topo, eid, master, tbparams)
1762            self.wrangle_software(expid, top, topo, tbparams)
1763
1764            vtopo = topdl.topology_to_vtopo(top)
1765            vis = self.genviz(vtopo)
1766
1767            # save federant information
1768            for k in allocated.keys():
1769                tbparams[k]['federant'] = {\
1770                        'name': [ { 'localname' : eid} ],\
1771                        'emulab': tbparams[k]['emulab'],\
1772                        'allocID' : tbparams[k]['allocID'],\
1773                        'master' : k == master,\
1774                    }
1775
1776            self.state_lock.acquire()
1777            self.state[eid]['vtopo'] = vtopo
1778            self.state[eid]['vis'] = vis
1779            self.state[expid]['federant'] = \
1780                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1781                        if tbparams[tb].has_key('federant') ]
1782            if self.state_filename: 
1783                self.write_state()
1784            self.state_lock.release()
1785        except service_error, e:
1786            # If something goes wrong in the parse (usually an access error)
1787            # clear the placeholder state.  From here on out the code delays
1788            # exceptions.  Failing at this point returns a fault to the remote
1789            # caller.
1790
1791            self.state_lock.acquire()
1792            del self.state[eid]
1793            del self.state[expid]
1794            if self.state_filename: self.write_state()
1795            self.state_lock.release()
1796            raise e
1797
1798
1799        # Start the background swapper and return the starting state.  From
1800        # here on out, the state will stick around a while.
1801
1802        # Let users touch the state
1803        self.auth.set_attribute(fid, expid)
1804        self.auth.set_attribute(expid, expid)
1805        # Override fedids can manipulate state as well
1806        for o in self.overrides:
1807            self.auth.set_attribute(o, expid)
1808
1809        # Create a logger that logs to the experiment's state object as well as
1810        # to the main log file.
1811        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1812        alloc_collector = self.list_log(self.state[eid]['log'])
1813        h = logging.StreamHandler(alloc_collector)
1814        # XXX: there should be a global one of these rather than repeating the
1815        # code.
1816        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1817                    '%d %b %y %H:%M:%S'))
1818        alloc_log.addHandler(h)
1819       
1820        # XXX
1821        url_base = 'https://users.isi.deterlab.net:23232'
1822        attrs = [ 
1823                {
1824                    'attribute': 'ssh_pubkey', 
1825                    'value': '%s/%s/config/%s' % \
1826                            (url_base, expid, gw_pubkey_base)
1827                },
1828                {
1829                    'attribute': 'ssh_secretkey', 
1830                    'value': '%s/%s/config/%s' % \
1831                            (url_base, expid, gw_secretkey_base)
1832                },
1833                {
1834                    'attribute': 'hosts', 
1835                    'value': '%s/%s/config/hosts' % \
1836                            (url_base, expid)
1837                },
1838                {
1839                    'attribute': 'experiment_name',
1840                    'value': eid,
1841                },
1842            ]
1843
1844        # Start a thread to do the resource allocation
1845        t  = Thread(target=self.allocate_resources,
1846                args=(allocated, master, eid, expid, expcert, tbparams, 
1847                    topo, tmpdir, alloc_log, alloc_collector, attrs),
1848                name=eid)
1849        t.start()
1850
1851        rv = {
1852                'experimentID': [
1853                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1854                ],
1855                'experimentStatus': 'starting',
1856                'experimentAccess': { 'X509' : expcert }
1857            }
1858
1859        return rv
1860   
1861    def get_experiment_fedid(self, key):
1862        """
1863        find the fedid associated with the localname key in the state database.
1864        """
1865
1866        rv = None
1867        self.state_lock.acquire()
1868        if self.state.has_key(key):
1869            if isinstance(self.state[key], dict):
1870                try:
1871                    kl = [ f['fedid'] for f in \
1872                            self.state[key]['experimentID']\
1873                                if f.has_key('fedid') ]
1874                except KeyError:
1875                    self.state_lock.release()
1876                    raise service_error(service_error.internal, 
1877                            "No fedid for experiment %s when getting "+\
1878                                    "fedid(!?)" % key)
1879                if len(kl) == 1:
1880                    rv = kl[0]
1881                else:
1882                    self.state_lock.release()
1883                    raise service_error(service_error.internal, 
1884                            "multiple fedids for experiment %s when " +\
1885                                    "getting fedid(!?)" % key)
1886            else:
1887                self.state_lock.release()
1888                raise service_error(service_error.internal, 
1889                        "Unexpected state for %s" % key)
1890        self.state_lock.release()
1891        return rv
1892
1893    def check_experiment_access(self, fid, key):
1894        """
1895        Confirm that the fid has access to the experiment.  Though a request
1896        may be made in terms of a local name, the access attribute is always
1897        the experiment's fedid.
1898        """
1899        if not isinstance(key, fedid):
1900            key = self.get_experiment_fedid(key)
1901
1902        if self.auth.check_attribute(fid, key):
1903            return True
1904        else:
1905            raise service_error(service_error.access, "Access Denied")
1906
1907
1908    def get_handler(self, path, fid):
1909        if self.auth.check_attribute(fid, path):
1910            return ("%s/%s" % (self.repodir, path), "application/binary")
1911        else:
1912            return (None, None)
1913
1914    def get_vtopo(self, req, fid):
1915        """
1916        Return the stored virtual topology for this experiment
1917        """
1918        rv = None
1919        state = None
1920
1921        req = req.get('VtopoRequestBody', None)
1922        if not req:
1923            raise service_error(service_error.req,
1924                    "Bad request format (no VtopoRequestBody)")
1925        exp = req.get('experiment', None)
1926        if exp:
1927            if exp.has_key('fedid'):
1928                key = exp['fedid']
1929                keytype = "fedid"
1930            elif exp.has_key('localname'):
1931                key = exp['localname']
1932                keytype = "localname"
1933            else:
1934                raise service_error(service_error.req, "Unknown lookup type")
1935        else:
1936            raise service_error(service_error.req, "No request?")
1937
1938        self.check_experiment_access(fid, key)
1939
1940        self.state_lock.acquire()
1941        if self.state.has_key(key):
1942            if self.state[key].has_key('vtopo'):
1943                rv = { 'experiment' : {keytype: key },\
1944                        'vtopo': self.state[key]['vtopo'],\
1945                    }
1946            else:
1947                state = self.state[key]['experimentStatus']
1948        self.state_lock.release()
1949
1950        if rv: return rv
1951        else: 
1952            if state:
1953                raise service_error(service_error.partial, 
1954                        "Not ready: %s" % state)
1955            else:
1956                raise service_error(service_error.req, "No such experiment")
1957
1958    def get_vis(self, req, fid):
1959        """
1960        Return the stored visualization for this experiment
1961        """
1962        rv = None
1963        state = None
1964
1965        req = req.get('VisRequestBody', None)
1966        if not req:
1967            raise service_error(service_error.req,
1968                    "Bad request format (no VisRequestBody)")
1969        exp = req.get('experiment', None)
1970        if exp:
1971            if exp.has_key('fedid'):
1972                key = exp['fedid']
1973                keytype = "fedid"
1974            elif exp.has_key('localname'):
1975                key = exp['localname']
1976                keytype = "localname"
1977            else:
1978                raise service_error(service_error.req, "Unknown lookup type")
1979        else:
1980            raise service_error(service_error.req, "No request?")
1981
1982        self.check_experiment_access(fid, key)
1983
1984        self.state_lock.acquire()
1985        if self.state.has_key(key):
1986            if self.state[key].has_key('vis'):
1987                rv =  { 'experiment' : {keytype: key },\
1988                        'vis': self.state[key]['vis'],\
1989                        }
1990            else:
1991                state = self.state[key]['experimentStatus']
1992        self.state_lock.release()
1993
1994        if rv: return rv
1995        else:
1996            if state:
1997                raise service_error(service_error.partial, 
1998                        "Not ready: %s" % state)
1999            else:
2000                raise service_error(service_error.req, "No such experiment")
2001
2002    def clean_info_response(self, rv):
2003        """
2004        Remove the information in the experiment's state object that is not in
2005        the info response.
2006        """
2007        # Remove the owner info (should always be there, but...)
2008        if rv.has_key('owner'): del rv['owner']
2009
2010        # Convert the log into the allocationLog parameter and remove the
2011        # log entry (with defensive programming)
2012        if rv.has_key('log'):
2013            rv['allocationLog'] = "".join(rv['log'])
2014            del rv['log']
2015        else:
2016            rv['allocationLog'] = ""
2017
2018        if rv['experimentStatus'] != 'active':
2019            if rv.has_key('federant'): del rv['federant']
2020        else:
2021            # remove the allocationID info from each federant
2022            for f in rv.get('federant', []):
2023                if f.has_key('allocID'): del f['allocID']
2024        return rv
2025
2026    def get_info(self, req, fid):
2027        """
2028        Return all the stored info about this experiment
2029        """
2030        rv = None
2031
2032        req = req.get('InfoRequestBody', None)
2033        if not req:
2034            raise service_error(service_error.req,
2035                    "Bad request format (no InfoRequestBody)")
2036        exp = req.get('experiment', None)
2037        if exp:
2038            if exp.has_key('fedid'):
2039                key = exp['fedid']
2040                keytype = "fedid"
2041            elif exp.has_key('localname'):
2042                key = exp['localname']
2043                keytype = "localname"
2044            else:
2045                raise service_error(service_error.req, "Unknown lookup type")
2046        else:
2047            raise service_error(service_error.req, "No request?")
2048
2049        self.check_experiment_access(fid, key)
2050
2051        # The state may be massaged by the service function that called
2052        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2053        # state.
2054        self.state_lock.acquire()
2055        if self.state.has_key(key):
2056            rv = copy.deepcopy(self.state[key])
2057        self.state_lock.release()
2058
2059        if rv:
2060            return self.clean_info_response(rv)
2061        else:
2062            raise service_error(service_error.req, "No such experiment")
2063
2064    def get_multi_info(self, req, fid):
2065        """
2066        Return all the stored info that this fedid can access
2067        """
2068        rv = { 'info': [ ] }
2069
2070        self.state_lock.acquire()
2071        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2072            self.check_experiment_access(fid, key)
2073
2074            if self.state.has_key(key):
2075                e = copy.deepcopy(self.state[key])
2076                e = self.clean_info_response(e)
2077                rv['info'].append(e)
2078        self.state_lock.release()
2079        return rv
2080
2081    def terminate_experiment(self, req, fid):
2082        """
2083        Swap this experiment out on the federants and delete the shared
2084        information
2085        """
2086        tbparams = { }
2087        req = req.get('TerminateRequestBody', None)
2088        if not req:
2089            raise service_error(service_error.req,
2090                    "Bad request format (no TerminateRequestBody)")
2091        force = req.get('force', False)
2092        exp = req.get('experiment', None)
2093        if exp:
2094            if exp.has_key('fedid'):
2095                key = exp['fedid']
2096                keytype = "fedid"
2097            elif exp.has_key('localname'):
2098                key = exp['localname']
2099                keytype = "localname"
2100            else:
2101                raise service_error(service_error.req, "Unknown lookup type")
2102        else:
2103            raise service_error(service_error.req, "No request?")
2104
2105        self.check_experiment_access(fid, key)
2106
2107        dealloc_list = [ ]
2108
2109
2110        # Create a logger that logs to the dealloc_list as well as to the main
2111        # log file.
2112        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2113        h = logging.StreamHandler(self.list_log(dealloc_list))
2114        # XXX: there should be a global one of these rather than repeating the
2115        # code.
2116        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2117                    '%d %b %y %H:%M:%S'))
2118        dealloc_log.addHandler(h)
2119
2120        self.state_lock.acquire()
2121        fed_exp = self.state.get(key, None)
2122
2123        if fed_exp:
2124            # This branch of the conditional holds the lock to generate a
2125            # consistent temporary tbparams variable to deallocate experiments.
2126            # It releases the lock to do the deallocations and reacquires it to
2127            # remove the experiment state when the termination is complete.
2128
2129            # First make sure that the experiment creation is complete.
2130            status = fed_exp.get('experimentStatus', None)
2131
2132            if status:
2133                if status in ('starting', 'terminating'):
2134                    if not force:
2135                        self.state_lock.release()
2136                        raise service_error(service_error.partial, 
2137                                'Experiment still being created or destroyed')
2138                    else:
2139                        self.log.warning('Experiment in %s state ' % status + \
2140                                'being terminated by force.')
2141            else:
2142                # No status??? trouble
2143                self.state_lock.release()
2144                raise service_error(service_error.internal,
2145                        "Experiment has no status!?")
2146
2147            ids = []
2148            #  experimentID is a list of dicts that are self-describing
2149            #  identifiers.  This finds all the fedids and localnames - the
2150            #  keys of self.state - and puts them into ids.
2151            for id in fed_exp.get('experimentID', []):
2152                if id.has_key('fedid'): ids.append(id['fedid'])
2153                if id.has_key('localname'): ids.append(id['localname'])
2154
2155            # Collect the allocation/segment ids
2156            for fed in fed_exp.get('federant', []):
2157                try:
2158                    tb = fed['emulab']['project']['testbed']['localname']
2159                    aid = fed['allocID']
2160                except KeyError, e:
2161                    continue
2162                tbparams[tb] = aid
2163            fed_exp['experimentStatus'] = 'terminating'
2164            if self.state_filename: self.write_state()
2165            self.state_lock.release()
2166
2167            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2168            # then completes, so we can't wait if nothing starts.  So, no
2169            # tbparams, no start.
2170            if len(tbparams) > 0:
2171                thread_pool = self.thread_pool(self.nthreads)
2172                for tb in tbparams.keys():
2173                    # Create and start a thread to stop the segment
2174                    thread_pool.wait_for_slot()
2175                    uri = self.tbmap.get(tb, None)
2176                    t  = self.pooled_thread(\
2177                            target=self.terminate_segment(log=dealloc_log,
2178                                testbed=tb,
2179                                cert_file=self.cert_file, 
2180                                cert_pwd=self.cert_pwd,
2181                                trusted_certs=self.trusted_certs,
2182                                caller=self.call_TerminateSegment),
2183                            args=(uri, tbparams[tb]), name=tb,
2184                            pdata=thread_pool, trace_file=self.trace_file)
2185                    t.start()
2186                # Wait for completions
2187                thread_pool.wait_for_all_done()
2188
2189            # release the allocations (failed experiments have done this
2190            # already, and starting experiments may be in odd states, so we
2191            # ignore errors releasing those allocations
2192            try: 
2193                for tb in tbparams.keys():
2194                    self.release_access(tb, tbparams[tb])
2195            except service_error, e:
2196                if status != 'failed' and not force:
2197                    raise e
2198
2199            # Remove the terminated experiment
2200            self.state_lock.acquire()
2201            for id in ids:
2202                if self.state.has_key(id): del self.state[id]
2203
2204            if self.state_filename: self.write_state()
2205            self.state_lock.release()
2206
2207            return { 
2208                    'experiment': exp , 
2209                    'deallocationLog': "".join(dealloc_list),
2210                    }
2211        else:
2212            # Don't forget to release the lock
2213            self.state_lock.release()
2214            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.