source: fedd/federation/experiment_control.py @ d20823f

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since d20823f was d20823f, checked in by Ted Faber <faber@…>, 14 years ago

This is very odd. Defaulting reqs in the federated_service constructor
resulted in all the instances sharing a reqs member (they all pointed to the
same thing). That's pretty non-intuitive, but seems to be the way python does
default parameter initialization. Beware.

  • Property mode set to 100644
File size: 80.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=[], params={ }, reqs=[]):
49        self.name=name
50        self.exporter=exporter
51        self.importers=importers
52        self.params = params
53        self.reqs = reqs
54
55    def __str__(self):
56        return "name %s export %s import %s params %s reqs %s" % \
57                (self.name, self.exporter, self.importers, self.params,
58                        [ (r['name'], r['visibility']) for r in self.reqs] )
59
60class experiment_control_local:
61    """
62    Control of experiments that this system can directly access.
63
64    Includes experiment creation, termination and information dissemination.
65    Thred safe.
66    """
67
68    class ssh_cmd_timeout(RuntimeError): pass
69   
70    class thread_pool:
71        """
72        A class to keep track of a set of threads all invoked for the same
73        task.  Manages the mutual exclusion of the states.
74        """
75        def __init__(self, nthreads):
76            """
77            Start a pool.
78            """
79            self.changed = Condition()
80            self.started = 0
81            self.terminated = 0
82            self.nthreads = nthreads
83
84        def acquire(self):
85            """
86            Get the pool's lock.
87            """
88            self.changed.acquire()
89
90        def release(self):
91            """
92            Release the pool's lock.
93            """
94            self.changed.release()
95
96        def wait(self, timeout = None):
97            """
98            Wait for a pool thread to start or stop.
99            """
100            self.changed.wait(timeout)
101
102        def start(self):
103            """
104            Called by a pool thread to report starting.
105            """
106            self.changed.acquire()
107            self.started += 1
108            self.changed.notifyAll()
109            self.changed.release()
110
111        def terminate(self):
112            """
113            Called by a pool thread to report finishing.
114            """
115            self.changed.acquire()
116            self.terminated += 1
117            self.changed.notifyAll()
118            self.changed.release()
119
120        def clear(self):
121            """
122            Clear all pool data.
123            """
124            self.changed.acquire()
125            self.started = 0
126            self.terminated =0
127            self.changed.notifyAll()
128            self.changed.release()
129
130        def wait_for_slot(self):
131            """
132            Wait until we have a free slot to start another pooled thread
133            """
134            self.acquire()
135            while self.started - self.terminated >= self.nthreads:
136                self.wait()
137            self.release()
138
139        def wait_for_all_done(self, timeout=None):
140            """
141            Wait until all active threads finish (and at least one has
142            started).  If a timeout is given, return after waiting that long
143            for termination.  If all threads are done (and one has started in
144            the since the last clear()) return True, otherwise False.
145            """
146            if timeout:
147                deadline = time.time() + timeout
148            self.acquire()
149            while self.started == 0 or self.started > self.terminated:
150                self.wait(timeout)
151                if timeout:
152                    if time.time() > deadline:
153                        break
154                    timeout = deadline - time.time()
155            self.release()
156            return not (self.started == 0 or self.started > self.terminated)
157
158    class pooled_thread(Thread):
159        """
160        One of a set of threads dedicated to a specific task.  Uses the
161        thread_pool class above for coordination.
162        """
163        def __init__(self, group=None, target=None, name=None, args=(), 
164                kwargs={}, pdata=None, trace_file=None):
165            Thread.__init__(self, group, target, name, args, kwargs)
166            self.rv = None          # Return value of the ops in this thread
167            self.exception = None   # Exception that terminated this thread
168            self.target=target      # Target function to run on start()
169            self.args = args        # Args to pass to target
170            self.kwargs = kwargs    # Additional kw args
171            self.pdata = pdata      # thread_pool for this class
172            # Logger for this thread
173            self.log = logging.getLogger("fedd.experiment_control")
174       
175        def run(self):
176            """
177            Emulate Thread.run, except add pool data manipulation and error
178            logging.
179            """
180            if self.pdata:
181                self.pdata.start()
182
183            if self.target:
184                try:
185                    self.rv = self.target(*self.args, **self.kwargs)
186                except service_error, s:
187                    self.exception = s
188                    self.log.error("Thread exception: %s %s" % \
189                            (s.code_string(), s.desc))
190                except:
191                    self.exception = sys.exc_info()[1]
192                    self.log.error(("Unexpected thread exception: %s" +\
193                            "Trace %s") % (self.exception,\
194                                traceback.format_exc()))
195            if self.pdata:
196                self.pdata.terminate()
197
198    call_RequestAccess = service_caller('RequestAccess')
199    call_ReleaseAccess = service_caller('ReleaseAccess')
200    call_StartSegment = service_caller('StartSegment')
201    call_TerminateSegment = service_caller('TerminateSegment')
202    call_Ns2Topdl = service_caller('Ns2Topdl')
203
204    def __init__(self, config=None, auth=None):
205        """
206        Intialize the various attributes, most from the config object
207        """
208
209        def parse_tarfile_list(tf):
210            """
211            Parse a tarfile list from the configuration.  This is a set of
212            paths and tarfiles separated by spaces.
213            """
214            rv = [ ]
215            if tf is not None:
216                tl = tf.split()
217                while len(tl) > 1:
218                    p, t = tl[0:2]
219                    del tl[0:2]
220                    rv.append((p, t))
221            return rv
222
223        self.thread_with_rv = experiment_control_local.pooled_thread
224        self.thread_pool = experiment_control_local.thread_pool
225        self.list_log = list_log.list_log
226
227        self.cert_file = config.get("experiment_control", "cert_file")
228        if self.cert_file:
229            self.cert_pwd = config.get("experiment_control", "cert_pwd")
230        else:
231            self.cert_file = config.get("globals", "cert_file")
232            self.cert_pwd = config.get("globals", "cert_pwd")
233
234        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
235                or config.get("globals", "trusted_certs")
236
237        self.repodir = config.get("experiment_control", "repodir")
238        self.repo_url = config.get("experiment_control", "repo_url", 
239                "https://users.isi.deterlab.net:23235");
240
241        self.exp_stem = "fed-stem"
242        self.log = logging.getLogger("fedd.experiment_control")
243        set_log_level(config, "experiment_control", self.log)
244        self.muxmax = 2
245        self.nthreads = 10
246        self.randomize_experiments = False
247
248        self.splitter = None
249        self.ssh_keygen = "/usr/bin/ssh-keygen"
250        self.ssh_identity_file = None
251
252
253        self.debug = config.getboolean("experiment_control", "create_debug")
254        self.cleanup = not config.getboolean("experiment_control", 
255                "leave_tmpfiles")
256        self.state_filename = config.get("experiment_control", 
257                "experiment_state")
258        self.store_filename = config.get("experiment_control", 
259                "synch_store")
260        self.store_url = config.get("experiment_control", "store_url")
261        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
262        self.fedkit = parse_tarfile_list(\
263                config.get("experiment_control", "fedkit"))
264        self.gatewaykit = parse_tarfile_list(\
265                config.get("experiment_control", "gatewaykit"))
266        accessdb_file = config.get("experiment_control", "accessdb")
267
268        self.ssh_pubkey_file = config.get("experiment_control", 
269                "ssh_pubkey_file")
270        self.ssh_privkey_file = config.get("experiment_control",
271                "ssh_privkey_file")
272        # NB for internal master/slave ops, not experiment setup
273        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
274
275        self.overrides = set([])
276        ovr = config.get('experiment_control', 'overrides')
277        if ovr:
278            for o in ovr.split(","):
279                o = o.strip()
280                if o.startswith('fedid:'): o = o[len('fedid:'):]
281                self.overrides.add(fedid(hexstr=o))
282
283        self.state = { }
284        self.state_lock = Lock()
285        self.tclsh = "/usr/local/bin/otclsh"
286        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
287                config.get("experiment_control", "tcl_splitter",
288                        "/usr/testbed/lib/ns2ir/parse.tcl")
289        mapdb_file = config.get("experiment_control", "mapdb")
290        self.trace_file = sys.stderr
291
292        self.def_expstart = \
293                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
294                "/tmp/federate";
295        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
296                "FEDDIR/hosts";
297        self.def_gwstart = \
298                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
299                "/tmp/bridge.log";
300        self.def_mgwstart = \
301                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
302                "/tmp/bridge.log";
303        self.def_gwimage = "FBSD61-TUNNEL2";
304        self.def_gwtype = "pc";
305        self.local_access = { }
306
307        if auth:
308            self.auth = auth
309        else:
310            self.log.error(\
311                    "[access]: No authorizer initialized, creating local one.")
312            auth = authorizer()
313
314
315        if self.ssh_pubkey_file:
316            try:
317                f = open(self.ssh_pubkey_file, 'r')
318                self.ssh_pubkey = f.read()
319                f.close()
320            except IOError:
321                raise service_error(service_error.internal,
322                        "Cannot read sshpubkey")
323        else:
324            raise service_error(service_error.internal, 
325                    "No SSH public key file?")
326
327        if not self.ssh_privkey_file:
328            raise service_error(service_error.internal, 
329                    "No SSH public key file?")
330
331
332        if mapdb_file:
333            self.read_mapdb(mapdb_file)
334        else:
335            self.log.warn("[experiment_control] No testbed map, using defaults")
336            self.tbmap = { 
337                    'deter':'https://users.isi.deterlab.net:23235',
338                    'emulab':'https://users.isi.deterlab.net:23236',
339                    'ucb':'https://users.isi.deterlab.net:23237',
340                    }
341
342        if accessdb_file:
343                self.read_accessdb(accessdb_file)
344        else:
345            raise service_error(service_error.internal,
346                    "No accessdb specified in config")
347
348        # Grab saved state.  OK to do this w/o locking because it's read only
349        # and only one thread should be in existence that can see self.state at
350        # this point.
351        if self.state_filename:
352            self.read_state()
353
354        if self.store_filename:
355            self.read_store()
356        else:
357            self.log.warning("No saved synch store")
358            self.synch_store = synch_store
359
360        # Dispatch tables
361        self.soap_services = {\
362                'New': soap_handler('New', self.new_experiment),
363                'Create': soap_handler('Create', self.create_experiment),
364                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
365                'Vis': soap_handler('Vis', self.get_vis),
366                'Info': soap_handler('Info', self.get_info),
367                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
368                'Terminate': soap_handler('Terminate', 
369                    self.terminate_experiment),
370                'GetValue': soap_handler('GetValue', self.GetValue),
371                'SetValue': soap_handler('SetValue', self.SetValue),
372        }
373
374        self.xmlrpc_services = {\
375                'New': xmlrpc_handler('New', self.new_experiment),
376                'Create': xmlrpc_handler('Create', self.create_experiment),
377                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
378                'Vis': xmlrpc_handler('Vis', self.get_vis),
379                'Info': xmlrpc_handler('Info', self.get_info),
380                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
381                'Terminate': xmlrpc_handler('Terminate',
382                    self.terminate_experiment),
383                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
384                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
385        }
386
387    # Call while holding self.state_lock
388    def write_state(self):
389        """
390        Write a new copy of experiment state after copying the existing state
391        to a backup.
392
393        State format is a simple pickling of the state dictionary.
394        """
395        if os.access(self.state_filename, os.W_OK):
396            copy_file(self.state_filename, \
397                    "%s.bak" % self.state_filename)
398        try:
399            f = open(self.state_filename, 'w')
400            pickle.dump(self.state, f)
401        except IOError, e:
402            self.log.error("Can't write file %s: %s" % \
403                    (self.state_filename, e))
404        except pickle.PicklingError, e:
405            self.log.error("Pickling problem: %s" % e)
406        except TypeError, e:
407            self.log.error("Pickling problem (TypeError): %s" % e)
408
409    @staticmethod
410    def get_alloc_ids(state):
411        """
412        Pull the fedids of the identifiers of each allocation from the
413        state.  Again, a dict dive that's best isolated.
414
415        Used by read_store and read state
416        """
417
418        return [ f['allocID']['fedid'] 
419                for f in state.get('federant',[]) \
420                    if f.has_key('allocID') and \
421                        f['allocID'].has_key('fedid')]
422
423    # Call while holding self.state_lock
424    def read_state(self):
425        """
426        Read a new copy of experiment state.  Old state is overwritten.
427
428        State format is a simple pickling of the state dictionary.
429        """
430       
431        def get_experiment_id(state):
432            """
433            Pull the fedid experimentID out of the saved state.  This is kind
434            of a gross walk through the dict.
435            """
436
437            if state.has_key('experimentID'):
438                for e in state['experimentID']:
439                    if e.has_key('fedid'):
440                        return e['fedid']
441                else:
442                    return None
443            else:
444                return None
445
446        try:
447            f = open(self.state_filename, "r")
448            self.state = pickle.load(f)
449            self.log.debug("[read_state]: Read state from %s" % \
450                    self.state_filename)
451        except IOError, e:
452            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
453                    % (self.state_filename, e))
454        except pickle.UnpicklingError, e:
455            self.log.warning(("[read_state]: No saved state: " + \
456                    "Unpickling failed: %s") % e)
457       
458        for s in self.state.values():
459            try:
460
461                eid = get_experiment_id(s)
462                if eid : 
463                    # Give the owner rights to the experiment
464                    self.auth.set_attribute(s['owner'], eid)
465                    # And holders of the eid as well
466                    self.auth.set_attribute(eid, eid)
467                    # allow overrides to control experiments as well
468                    for o in self.overrides:
469                        self.auth.set_attribute(o, eid)
470                    # Set permissions to allow reading of the software repo, if
471                    # any, as well.
472                    for a in self.get_alloc_ids(s):
473                        self.auth.set_attribute(a, 'repo/%s' % eid)
474                else:
475                    raise KeyError("No experiment id")
476            except KeyError, e:
477                self.log.warning("[read_state]: State ownership or identity " +\
478                        "misformatted in %s: %s" % (self.state_filename, e))
479
480
481    def read_accessdb(self, accessdb_file):
482        """
483        Read the mapping from fedids that can create experiments to their name
484        in the 3-level access namespace.  All will be asserted from this
485        testbed and can include the local username and porject that will be
486        asserted on their behalf by this fedd.  Each fedid is also added to the
487        authorization system with the "create" attribute.
488        """
489        self.accessdb = {}
490        # These are the regexps for parsing the db
491        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
492        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
493                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
494        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
495                "\s*->\s*(" + name_expr + ")\s*$")
496        lineno = 0
497
498        # Parse the mappings and store in self.authdb, a dict of
499        # fedid -> (proj, user)
500        try:
501            f = open(accessdb_file, "r")
502            for line in f:
503                lineno += 1
504                line = line.strip()
505                if len(line) == 0 or line.startswith('#'):
506                    continue
507                m = project_line.match(line)
508                if m:
509                    fid = fedid(hexstr=m.group(1))
510                    project, user = m.group(2,3)
511                    if not self.accessdb.has_key(fid):
512                        self.accessdb[fid] = []
513                    self.accessdb[fid].append((project, user))
514                    continue
515
516                m = user_line.match(line)
517                if m:
518                    fid = fedid(hexstr=m.group(1))
519                    project = None
520                    user = m.group(2)
521                    if not self.accessdb.has_key(fid):
522                        self.accessdb[fid] = []
523                    self.accessdb[fid].append((project, user))
524                    continue
525                self.log.warn("[experiment_control] Error parsing access " +\
526                        "db %s at line %d" %  (accessdb_file, lineno))
527        except IOError:
528            raise service_error(service_error.internal,
529                    ("Error opening/reading %s as experiment " +\
530                            "control accessdb") %  accessdb_file)
531        f.close()
532
533        # Initialize the authorization attributes
534        for fid in self.accessdb.keys():
535            self.auth.set_attribute(fid, 'create')
536            self.auth.set_attribute(fid, 'new')
537
538    def read_mapdb(self, file):
539        """
540        Read a simple colon separated list of mappings for the
541        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
542        """
543
544        self.tbmap = { }
545        lineno =0
546        try:
547            f = open(file, "r")
548            for line in f:
549                lineno += 1
550                line = line.strip()
551                if line.startswith('#') or len(line) == 0:
552                    continue
553                try:
554                    label, url = line.split(':', 1)
555                    self.tbmap[label] = url
556                except ValueError, e:
557                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
558                            "map db: %s %s" % (lineno, line, e))
559        except IOError, e:
560            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
561                    "open %s: %s" % (file, e))
562        f.close()
563
564    def read_store(self):
565        try:
566            self.synch_store = synch_store()
567            self.synch_store.load(self.store_filename)
568            self.log.debug("[read_store]: Read store from %s" % \
569                    self.store_filename)
570        except IOError, e:
571            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
572                    % (self.state_filename, e))
573            self.synch_store = synch_store()
574
575        # Set the initial permissions on data in the store.  XXX: This ad hoc
576        # authorization attribute initialization is getting out of hand.
577        for k in self.synch_store.all_keys():
578            try:
579                if k.startswith('fedid:'):
580                    fid = fedid(hexstr=k[6:46])
581                    if self.state.has_key(fid):
582                        for a in self.get_alloc_ids(self.state[fid]):
583                            self.auth.set_attribute(a, k)
584            except ValueError, e:
585                self.log.warn("Cannot deduce permissions for %s" % k)
586
587
588    def write_store(self):
589        """
590        Write a new copy of synch_store after writing current state
591        to a backup.  We use the internal synch_store pickle method to avoid
592        incinsistent data.
593
594        State format is a simple pickling of the store.
595        """
596        if os.access(self.store_filename, os.W_OK):
597            copy_file(self.store_filename, \
598                    "%s.bak" % self.store_filename)
599        try:
600            self.synch_store.save(self.store_filename)
601        except IOError, e:
602            self.log.error("Can't write file %s: %s" % \
603                    (self.store_filename, e))
604        except TypeError, e:
605            self.log.error("Pickling problem (TypeError): %s" % e)
606
607       
608    def generate_ssh_keys(self, dest, type="rsa" ):
609        """
610        Generate a set of keys for the gateways to use to talk.
611
612        Keys are of type type and are stored in the required dest file.
613        """
614        valid_types = ("rsa", "dsa")
615        t = type.lower();
616        if t not in valid_types: raise ValueError
617        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
618
619        try:
620            trace = open("/dev/null", "w")
621        except IOError:
622            raise service_error(service_error.internal,
623                    "Cannot open /dev/null??");
624
625        # May raise CalledProcessError
626        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
627        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
628        if rv != 0:
629            raise service_error(service_error.internal, 
630                    "Cannot generate nonce ssh keys.  %s return code %d" \
631                            % (self.ssh_keygen, rv))
632
633    def gentopo(self, str):
634        """
635        Generate the topology dtat structure from the splitter's XML
636        representation of it.
637
638        The topology XML looks like:
639            <experiment>
640                <nodes>
641                    <node><vname></vname><ips>ip1:ip2</ips></node>
642                </nodes>
643                <lans>
644                    <lan>
645                        <vname></vname><vnode></vnode><ip></ip>
646                        <bandwidth></bandwidth><member>node:port</member>
647                    </lan>
648                </lans>
649        """
650        class topo_parse:
651            """
652            Parse the topology XML and create the dats structure.
653            """
654            def __init__(self):
655                # Typing of the subelements for data conversion
656                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
657                self.int_subelements = ( 'bandwidth',)
658                self.float_subelements = ( 'delay',)
659                # The final data structure
660                self.nodes = [ ]
661                self.lans =  [ ]
662                self.topo = { \
663                        'node': self.nodes,\
664                        'lan' : self.lans,\
665                    }
666                self.element = { }  # Current element being created
667                self.chars = ""     # Last text seen
668
669            def end_element(self, name):
670                # After each sub element the contents is added to the current
671                # element or to the appropriate list.
672                if name == 'node':
673                    self.nodes.append(self.element)
674                    self.element = { }
675                elif name == 'lan':
676                    self.lans.append(self.element)
677                    self.element = { }
678                elif name in self.str_subelements:
679                    self.element[name] = self.chars
680                    self.chars = ""
681                elif name in self.int_subelements:
682                    self.element[name] = int(self.chars)
683                    self.chars = ""
684                elif name in self.float_subelements:
685                    self.element[name] = float(self.chars)
686                    self.chars = ""
687
688            def found_chars(self, data):
689                self.chars += data.rstrip()
690
691
692        tp = topo_parse();
693        parser = xml.parsers.expat.ParserCreate()
694        parser.EndElementHandler = tp.end_element
695        parser.CharacterDataHandler = tp.found_chars
696
697        parser.Parse(str)
698
699        return tp.topo
700       
701
702    def genviz(self, topo):
703        """
704        Generate the visualization the virtual topology
705        """
706
707        neato = "/usr/local/bin/neato"
708        # These are used to parse neato output and to create the visualization
709        # file.
710        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
711        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
712                "%s</type></node>"
713
714        try:
715            # Node names
716            nodes = [ n['vname'] for n in topo['node'] ]
717            topo_lans = topo['lan']
718        except KeyError, e:
719            raise service_error(service_error.internal, "Bad topology: %s" %e)
720
721        lans = { }
722        links = { }
723
724        # Walk through the virtual topology, organizing the connections into
725        # 2-node connections (links) and more-than-2-node connections (lans).
726        # When a lan is created, it's added to the list of nodes (there's a
727        # node in the visualization for the lan).
728        for l in topo_lans:
729            if links.has_key(l['vname']):
730                if len(links[l['vname']]) < 2:
731                    links[l['vname']].append(l['vnode'])
732                else:
733                    nodes.append(l['vname'])
734                    lans[l['vname']] = links[l['vname']]
735                    del links[l['vname']]
736                    lans[l['vname']].append(l['vnode'])
737            elif lans.has_key(l['vname']):
738                lans[l['vname']].append(l['vnode'])
739            else:
740                links[l['vname']] = [ l['vnode'] ]
741
742
743        # Open up a temporary file for dot to turn into a visualization
744        try:
745            df, dotname = tempfile.mkstemp()
746            dotfile = os.fdopen(df, 'w')
747        except IOError:
748            raise service_error(service_error.internal,
749                    "Failed to open file in genviz")
750
751        try:
752            dnull = open('/dev/null', 'w')
753        except IOError:
754            service_error(service_error.internal,
755                    "Failed to open /dev/null in genviz")
756
757        # Generate a dot/neato input file from the links, nodes and lans
758        try:
759            print >>dotfile, "graph G {"
760            for n in nodes:
761                print >>dotfile, '\t"%s"' % n
762            for l in links.keys():
763                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
764            for l in lans.keys():
765                for n in lans[l]:
766                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
767            print >>dotfile, "}"
768            dotfile.close()
769        except TypeError:
770            raise service_error(service_error.internal,
771                    "Single endpoint link in vtopo")
772        except IOError:
773            raise service_error(service_error.internal, "Cannot write dot file")
774
775        # Use dot to create a visualization
776        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
777                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
778                close_fds=True)
779        dnull.close()
780
781        # Translate dot to vis format
782        vis_nodes = [ ]
783        vis = { 'node': vis_nodes }
784        for line in dot.stdout:
785            m = vis_re.match(line)
786            if m:
787                vn = m.group(1)
788                vis_node = {'name': vn, \
789                        'x': float(m.group(2)),\
790                        'y' : float(m.group(3)),\
791                    }
792                if vn in links.keys() or vn in lans.keys():
793                    vis_node['type'] = 'lan'
794                else:
795                    vis_node['type'] = 'node'
796                vis_nodes.append(vis_node)
797        rv = dot.wait()
798
799        os.remove(dotname)
800        if rv == 0 : return vis
801        else: return None
802
803    def get_access(self, tb, nodes, tbparam, access_user, masters):
804        """
805        Get access to testbed through fedd and set the parameters for that tb
806        """
807        def get_export_project(svcs):
808            """
809            Look through for the list of federated_service for this testbed
810            objects for a project_export service, and extract the project
811            parameter.
812            """
813
814            pe = [s for s in svcs if s.name=='project_export']
815            if len(pe) == 1:
816                return pe[0].params.get('project', None)
817            elif len(pe) == 0:
818                return None
819            else:
820                raise service_error(service_error.req,
821                        "More than one project export is not supported")
822
823        uri = self.tbmap.get(testbed_base(tb), None)
824        if not uri:
825            raise service_error(service_error.server_config, 
826                    "Unknown testbed: %s" % tb)
827
828        export_svcs = masters.get(tb,[])
829        import_svcs = [ s for m in masters.values() \
830                for s in m \
831                    if tb in s.importers ]
832
833        export_project = get_export_project(export_svcs)
834
835        # Tweak search order so that if there are entries in access_user that
836        # have a project matching the export project, we try them first
837        if export_project: 
838            access_sequence = [ (p, u) for p, u in access_user \
839                    if p == export_project] 
840            access_sequence.extend([(p, u) for p, u in access_user \
841                    if p != export_project]) 
842        else: 
843            access_sequence = access_user
844
845        for p, u in access_sequence: 
846            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
847                    "to %s") %  ((p or "None"), u, uri))
848
849            if p:
850                # Request with user and project specified
851                req = {\
852                        'destinationTestbed' : { 'uri' : uri },
853                        'credential': [ "project: %s" % p, "user: %s"  % u],
854                        'allocID' : { 'localname': 'test' },
855                    }
856            else:
857                # Request with only user specified
858                req = {\
859                        'destinationTestbed' : { 'uri' : uri },
860                        'credential': [ 'user: %s' % u ],
861                        'allocID' : { 'localname': 'test' },
862                    }
863
864            # Make the service request from the services we're importing and
865            # exporting.  Keep track of the export request ids so we can
866            # collect the resulting info from the access response.
867            e_keys = { }
868            if import_svcs or export_svcs:
869                req['service'] = [ ]
870
871                for i, s in enumerate(import_svcs):
872                    idx = 'import%d' % i
873                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
874                    if s.params:
875                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
876                                for k, v in s.params.items()]
877                    req['service'].append(sr)
878
879                for i, s in enumerate(export_svcs):
880                    idx = 'export%d' % i
881                    e_keys[idx] = s
882                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
883                    if s.params:
884                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
885                                for k, v in s.params.items()]
886                    req['service'].append(sr)
887
888            # node resources if any
889            if nodes != None and len(nodes) > 0:
890                rnodes = [ ]
891                for n in nodes:
892                    rn = { }
893                    image, hw, count = n.split(":")
894                    if image: rn['image'] = [ image ]
895                    if hw: rn['hardware'] = [ hw ]
896                    if count and int(count) >0 : rn['count'] = int(count)
897                    rnodes.append(rn)
898                req['resources']= { }
899                req['resources']['node'] = rnodes
900
901            try:
902                if self.local_access.has_key(uri):
903                    # Local access call
904                    req = { 'RequestAccessRequestBody' : req }
905                    r = self.local_access[uri].RequestAccess(req, 
906                            fedid(file=self.cert_file))
907                    r = { 'RequestAccessResponseBody' : r }
908                else:
909                    r = self.call_RequestAccess(uri, req, 
910                            self.cert_file, self.cert_pwd, self.trusted_certs)
911            except service_error, e:
912                if e.code == service_error.access:
913                    self.log.debug("[get_access] Access denied")
914                    r = None
915                    continue
916                else:
917                    raise e
918
919            if r.has_key('RequestAccessResponseBody'):
920                # Through to here we have a valid response, not a fault.
921                # Access denied is a fault, so something better or worse than
922                # access denied has happened.
923                r = r['RequestAccessResponseBody']
924                self.log.debug("[get_access] Access granted")
925                break
926            else:
927                raise service_error(service_error.protocol,
928                        "Bad proxy response")
929       
930        if not r:
931            raise service_error(service_error.access, 
932                    "Access denied by %s (%s)" % (tb, uri))
933
934        tbparam[tb] = { 
935                "allocID" : r['allocID'],
936                "uri": uri,
937                }
938
939        # Collect the responses corresponding to the services this testbed
940        # exports.  These will be the service requests that we will include in
941        # the start segment requests (with appropriate visibility values) to
942        # import and export the segments.
943        for s in r.get('service', []):
944            id = s.get('id', None)
945            if id and id in e_keys:
946                e_keys[id].reqs.append(s)
947
948        # Add attributes to parameter space.  We don't allow attributes to
949        # overlay any parameters already installed.
950        for a in r.get('fedAttr', []):
951            try:
952                if a['attribute'] and \
953                        isinstance(a['attribute'], basestring)\
954                        and not tbparam[tb].has_key(a['attribute'].lower()):
955                    tbparam[tb][a['attribute'].lower()] = a['value']
956            except KeyError:
957                self.log.error("Bad attribute in response: %s" % a)
958
959    def release_access(self, tb, aid, uri=None):
960        """
961        Release access to testbed through fedd
962        """
963
964        if not uri:
965            uri = self.tbmap.get(tb, None)
966        if not uri:
967            raise service_error(service_error.server_config, 
968                    "Unknown testbed: %s" % tb)
969
970        if self.local_access.has_key(uri):
971            resp = self.local_access[uri].ReleaseAccess(\
972                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
973                    fedid(file=self.cert_file))
974            resp = { 'ReleaseAccessResponseBody': resp } 
975        else:
976            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
977                    self.cert_file, self.cert_pwd, self.trusted_certs)
978
979        # better error coding
980
981    def remote_ns2topdl(self, uri, desc):
982
983        req = {
984                'description' : { 'ns2description': desc },
985            }
986
987        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
988                self.trusted_certs)
989
990        if r.has_key('Ns2TopdlResponseBody'):
991            r = r['Ns2TopdlResponseBody']
992            ed = r.get('experimentdescription', None)
993            if ed.has_key('topdldescription'):
994                return topdl.Topology(**ed['topdldescription'])
995            else:
996                raise service_error(service_error.protocol, 
997                        "Bad splitter response (no output)")
998        else:
999            raise service_error(service_error.protocol, "Bad splitter response")
1000
1001    class start_segment:
1002        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1003                cert_pwd=None, trusted_certs=None, caller=None,
1004                log_collector=None):
1005            self.log = log
1006            self.debug = debug
1007            self.cert_file = cert_file
1008            self.cert_pwd = cert_pwd
1009            self.trusted_certs = None
1010            self.caller = caller
1011            self.testbed = testbed
1012            self.log_collector = log_collector
1013            self.response = None
1014            self.node = { }
1015
1016        def make_map(self, resp):
1017            if 'segmentdescription' in resp and \
1018                    'topdldescription' in resp['segmentdescription']:
1019                top = topdl.Topology(\
1020                        **resp['segmentdescription']['topdldescription'])
1021                for e in [e for e in top.elements \
1022                        if isinstance(e, topdl.Computer)]:
1023                    hn = e.get_attribute('hostname')
1024                    if hn:
1025                        for n in e.name:
1026                            self.node[n] = hn
1027
1028        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1029            req = {
1030                    'allocID': { 'fedid' : aid }, 
1031                    'segmentdescription': { 
1032                        'topdldescription': topo.to_dict(),
1033                    },
1034                }
1035
1036            if connInfo:
1037                req['connection'] = connInfo
1038
1039            import_svcs = [ s for m in masters.values() \
1040                    for s in m if self.testbed in s.importers]
1041
1042            if import_svcs or self.testbed in masters:
1043                req['service'] = []
1044
1045            for s in import_svcs:
1046                for r in s.reqs:
1047                    sr = copy.deepcopy(r)
1048                    sr['visibility'] = 'import';
1049                    req['service'].append(sr)
1050
1051            for s in masters.get(self.testbed, []):
1052                for r in s.reqs:
1053                    sr = copy.deepcopy(r)
1054                    sr['visibility'] = 'export';
1055                    req['service'].append(sr)
1056
1057            if attrs:
1058                req['fedAttr'] = attrs
1059
1060            try:
1061                self.log.debug("Calling StartSegment at %s " % uri)
1062                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1063                        self.trusted_certs)
1064                if r.has_key('StartSegmentResponseBody'):
1065                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1066                            None)
1067                    if lval and self.log_collector:
1068                        for line in  lval.splitlines(True):
1069                            self.log_collector.write(line)
1070                    self.make_map(r['StartSegmentResponseBody'])
1071                    self.response = r
1072                else:
1073                    raise service_error(service_error.internal, 
1074                            "Bad response!?: %s" %r)
1075                return True
1076            except service_error, e:
1077                self.log.error("Start segment failed on %s: %s" % \
1078                        (self.testbed, e))
1079                return False
1080
1081
1082
1083    class terminate_segment:
1084        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1085                cert_pwd=None, trusted_certs=None, caller=None):
1086            self.log = log
1087            self.debug = debug
1088            self.cert_file = cert_file
1089            self.cert_pwd = cert_pwd
1090            self.trusted_certs = None
1091            self.caller = caller
1092            self.testbed = testbed
1093
1094        def __call__(self, uri, aid ):
1095            req = {
1096                    'allocID': aid , 
1097                }
1098            try:
1099                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1100                        self.trusted_certs)
1101                return True
1102            except service_error, e:
1103                self.log.error("Terminate segment failed on %s: %s" % \
1104                        (self.testbed, e))
1105                return False
1106   
1107
1108    def allocate_resources(self, allocated, masters, eid, expid, 
1109            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1110            attrs=None, connInfo={}):
1111
1112        started = { }           # Testbeds where a sub-experiment started
1113                                # successfully
1114
1115        # XXX
1116        fail_soft = False
1117
1118        log = alloc_log or self.log
1119
1120        thread_pool = self.thread_pool(self.nthreads)
1121        threads = [ ]
1122        starters = [ ]
1123
1124        for tb in allocated.keys():
1125            # Create and start a thread to start the segment, and save it
1126            # to get the return value later
1127            tb_attrs = copy.copy(attrs)
1128            thread_pool.wait_for_slot()
1129            uri = tbparams[tb].get('uri', \
1130                    self.tbmap.get(testbed_base(tb), None))
1131            base, suffix = split_testbed(tb)
1132            if suffix:
1133                tb_attrs.append({'attribute': 'experiment_name', 
1134                    'value': "%s_%s" % (eid, suffix)})
1135            else:
1136                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1137            if not uri:
1138                raise service_error(service_error.internal, 
1139                        "Unknown testbed %s !?" % tb)
1140
1141            if tbparams[tb].has_key('allocID') and \
1142                    tbparams[tb]['allocID'].has_key('fedid'):
1143                aid = tbparams[tb]['allocID']['fedid']
1144            else:
1145                raise service_error(service_error.internal, 
1146                        "No alloc id for testbed %s !?" % tb)
1147
1148            s = self.start_segment(log=log, debug=self.debug,
1149                    testbed=tb, cert_file=self.cert_file,
1150                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1151                    caller=self.call_StartSegment,
1152                    log_collector=log_collector)
1153            starters.append(s)
1154            t  = self.pooled_thread(\
1155                    target=s, name=tb,
1156                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1157                    pdata=thread_pool, trace_file=self.trace_file)
1158            threads.append(t)
1159            t.start()
1160
1161        # Wait until all finish (keep pinging the log, though)
1162        mins = 0
1163        revoked = False
1164        while not thread_pool.wait_for_all_done(60.0):
1165            mins += 1
1166            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1167                    % mins)
1168            if not revoked and \
1169                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1170                # a testbed has failed.  Revoke this experiment's
1171                # synchronizarion values so that sub experiments will not
1172                # deadlock waiting for synchronization that will never happen
1173                self.log.info("A subexperiment has failed to swap in, " + \
1174                        "revoking synch keys")
1175                var_key = "fedid:%s" % expid
1176                for k in self.synch_store.all_keys():
1177                    if len(k) > 45 and k[0:46] == var_key:
1178                        self.synch_store.revoke_key(k)
1179                revoked = True
1180
1181        failed = [ t.getName() for t in threads if not t.rv ]
1182        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1183
1184        # If one failed clean up, unless fail_soft is set
1185        if failed:
1186            if not fail_soft:
1187                thread_pool.clear()
1188                for tb in succeeded:
1189                    # Create and start a thread to stop the segment
1190                    thread_pool.wait_for_slot()
1191                    uri = tbparams[tb]['uri']
1192                    t  = self.pooled_thread(\
1193                            target=self.terminate_segment(log=log,
1194                                testbed=tb,
1195                                cert_file=self.cert_file, 
1196                                cert_pwd=self.cert_pwd,
1197                                trusted_certs=self.trusted_certs,
1198                                caller=self.call_TerminateSegment),
1199                            args=(uri, tbparams[tb]['federant']['allocID']),
1200                            name=tb,
1201                            pdata=thread_pool, trace_file=self.trace_file)
1202                    t.start()
1203                # Wait until all finish (if any are being stopped)
1204                if succeeded:
1205                    thread_pool.wait_for_all_done()
1206
1207                # release the allocations
1208                for tb in tbparams.keys():
1209                    self.release_access(tb, tbparams[tb]['allocID'],
1210                            tbparams[tb].get('uri', None))
1211                # Remove the placeholder
1212                self.state_lock.acquire()
1213                self.state[eid]['experimentStatus'] = 'failed'
1214                if self.state_filename: self.write_state()
1215                self.state_lock.release()
1216
1217                log.error("Swap in failed on %s" % ",".join(failed))
1218                return
1219        else:
1220            # Walk through the successes and gather the virtual to physical
1221            # mapping.
1222            node = { }
1223            for s in starters:
1224                node.update(s.node)
1225            # Assigng the mapping as a hostname attribute
1226            for e in [ e for e in top.elements \
1227                    if isinstance(e, topdl.Computer)]:
1228                for n in e.name:
1229                    if n in node:
1230                        e.set_attribute('hostname', node[n])
1231            log.info("[start_segment]: Experiment %s active" % eid)
1232
1233
1234        # Walk up tmpdir, deleting as we go
1235        if self.cleanup:
1236            log.debug("[start_experiment]: removing %s" % tmpdir)
1237            for path, dirs, files in os.walk(tmpdir, topdown=False):
1238                for f in files:
1239                    os.remove(os.path.join(path, f))
1240                for d in dirs:
1241                    os.rmdir(os.path.join(path, d))
1242            os.rmdir(tmpdir)
1243        else:
1244            log.debug("[start_experiment]: not removing %s" % tmpdir)
1245
1246        # Insert the experiment into our state and update the disk copy.
1247        self.state_lock.acquire()
1248        self.state[expid]['experimentStatus'] = 'active'
1249        self.state[eid] = self.state[expid]
1250        self.state[eid]['experimentdescription']['topdldescription'] = \
1251                top.to_dict()
1252        if self.state_filename: self.write_state()
1253        self.state_lock.release()
1254        return
1255
1256
1257    def add_kit(self, e, kit):
1258        """
1259        Add a Software object created from the list of (install, location)
1260        tuples passed as kit  to the software attribute of an object e.  We
1261        do this enough to break out the code, but it's kind of a hack to
1262        avoid changing the old tuple rep.
1263        """
1264
1265        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1266
1267        if isinstance(e.software, list): e.software.extend(s)
1268        else: e.software = s
1269
1270
1271    def create_experiment_state(self, fid, req, expid, expcert,
1272            state='starting'):
1273        """
1274        Create the initial entry in the experiment's state.  The expid and
1275        expcert are the experiment's fedid and certifacte that represents that
1276        ID, which are installed in the experiment state.  If the request
1277        includes a suggested local name that is used if possible.  If the local
1278        name is already taken by an experiment owned by this user that has
1279        failed, it is overwritten.  Otherwise new letters are added until a
1280        valid localname is found.  The generated local name is returned.
1281        """
1282
1283        if req.has_key('experimentID') and \
1284                req['experimentID'].has_key('localname'):
1285            overwrite = False
1286            eid = req['experimentID']['localname']
1287            # If there's an old failed experiment here with the same local name
1288            # and accessible by this user, we'll overwrite it, otherwise we'll
1289            # fall through and do the collision avoidance.
1290            old_expid = self.get_experiment_fedid(eid)
1291            if old_expid and self.check_experiment_access(fid, old_expid):
1292                self.state_lock.acquire()
1293                status = self.state[eid].get('experimentStatus', None)
1294                if status and status == 'failed':
1295                    # remove the old access attribute
1296                    self.auth.unset_attribute(fid, old_expid)
1297                    overwrite = True
1298                    del self.state[eid]
1299                    del self.state[old_expid]
1300                self.state_lock.release()
1301            self.state_lock.acquire()
1302            while (self.state.has_key(eid) and not overwrite):
1303                eid += random.choice(string.ascii_letters)
1304            # Initial state
1305            self.state[eid] = {
1306                    'experimentID' : \
1307                            [ { 'localname' : eid }, {'fedid': expid } ],
1308                    'experimentStatus': state,
1309                    'experimentAccess': { 'X509' : expcert },
1310                    'owner': fid,
1311                    'log' : [],
1312                }
1313            self.state[expid] = self.state[eid]
1314            if self.state_filename: self.write_state()
1315            self.state_lock.release()
1316        else:
1317            eid = self.exp_stem
1318            for i in range(0,5):
1319                eid += random.choice(string.ascii_letters)
1320            self.state_lock.acquire()
1321            while (self.state.has_key(eid)):
1322                eid = self.exp_stem
1323                for i in range(0,5):
1324                    eid += random.choice(string.ascii_letters)
1325            # Initial state
1326            self.state[eid] = {
1327                    'experimentID' : \
1328                            [ { 'localname' : eid }, {'fedid': expid } ],
1329                    'experimentStatus': state,
1330                    'experimentAccess': { 'X509' : expcert },
1331                    'owner': fid,
1332                    'log' : [],
1333                }
1334            self.state[expid] = self.state[eid]
1335            if self.state_filename: self.write_state()
1336            self.state_lock.release()
1337
1338        return eid
1339
1340
1341    def allocate_ips_to_topo(self, top):
1342        """
1343        Add an ip4_address attribute to all the hosts in the topology, based on
1344        the shared substrates on which they sit.  An /etc/hosts file is also
1345        created and returned as a list of hostfiles entries.  We also return
1346        the allocator, because we may need to allocate IPs to portals
1347        (specifically DRAGON portals).
1348        """
1349        subs = sorted(top.substrates, 
1350                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1351                reverse=True)
1352        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1353        ifs = { }
1354        hosts = [ ]
1355
1356        for idx, s in enumerate(subs):
1357            net_size = len(s.interfaces)+2
1358
1359            a = ips.allocate(net_size)
1360            if a :
1361                base, num = a
1362                if num < net_size: 
1363                    raise service_error(service_error.internal,
1364                            "Allocator returned wrong number of IPs??")
1365            else:
1366                raise service_error(service_error.req, 
1367                        "Cannot allocate IP addresses")
1368            mask = ips.min_alloc
1369            while mask < net_size:
1370                mask *= 2
1371
1372            netmask = ((2**32-1) ^ (mask-1))
1373
1374            base += 1
1375            for i in s.interfaces:
1376                i.attribute.append(
1377                        topdl.Attribute('ip4_address', 
1378                            "%s" % ip_addr(base)))
1379                i.attribute.append(
1380                        topdl.Attribute('ip4_netmask', 
1381                            "%s" % ip_addr(int(netmask))))
1382
1383                hname = i.element.name[0]
1384                if ifs.has_key(hname):
1385                    hosts.append("%s\t%s-%s %s-%d" % \
1386                            (ip_addr(base), hname, s.name, hname,
1387                                ifs[hname]))
1388                else:
1389                    ifs[hname] = 0
1390                    hosts.append("%s\t%s-%s %s-%d %s" % \
1391                            (ip_addr(base), hname, s.name, hname,
1392                                ifs[hname], hname))
1393
1394                ifs[hname] += 1
1395                base += 1
1396        return hosts, ips
1397
1398    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1399            tbparams, masters):
1400        """
1401        Request access to the various testbeds required for this instantiation
1402        (passed in as testbeds).  User, access_user, expoert_project and master
1403        are used to construct the correct requests.  Per-testbed parameters are
1404        returned in tbparams.
1405        """
1406        for tb in testbeds:
1407            self.get_access(tb, None, tbparams, access_user, masters)
1408            allocated[tb] = 1
1409
1410    def split_topology(self, top, topo, testbeds):
1411        """
1412        Create the sub-topologies that are needed for experiment instantiation.
1413        """
1414        for tb in testbeds:
1415            topo[tb] = top.clone()
1416            # copy in for loop allows deletions from the original
1417            for e in [ e for e in topo[tb].elements]:
1418                etb = e.get_attribute('testbed')
1419                # NB: elements without a testbed attribute won't appear in any
1420                # sub topologies. 
1421                if not etb or etb != tb:
1422                    for i in e.interface:
1423                        for s in i.subs:
1424                            try:
1425                                s.interfaces.remove(i)
1426                            except ValueError:
1427                                raise service_error(service_error.internal,
1428                                        "Can't remove interface??")
1429                    topo[tb].elements.remove(e)
1430            topo[tb].make_indices()
1431
1432    def wrangle_software(self, expid, top, topo, tbparams):
1433        """
1434        Copy software out to the repository directory, allocate permissions and
1435        rewrite the segment topologies to look for the software in local
1436        places.
1437        """
1438
1439        # Copy the rpms and tarfiles to a distribution directory from
1440        # which the federants can retrieve them
1441        linkpath = "%s/software" %  expid
1442        softdir ="%s/%s" % ( self.repodir, linkpath)
1443        softmap = { }
1444        # These are in a list of tuples format (each kit).  This comprehension
1445        # unwraps them into a single list of tuples that initilaizes the set of
1446        # tuples.
1447        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1448                for p, t in l ])
1449        pkgs.update([x.location for e in top.elements \
1450                for x in e.software])
1451        try:
1452            os.makedirs(softdir)
1453        except IOError, e:
1454            raise service_error(
1455                    "Cannot create software directory: %s" % e)
1456        # The actual copying.  Everything's converted into a url for copying.
1457        for pkg in pkgs:
1458            loc = pkg
1459
1460            scheme, host, path = urlparse(loc)[0:3]
1461            dest = os.path.basename(path)
1462            if not scheme:
1463                if not loc.startswith('/'):
1464                    loc = "/%s" % loc
1465                loc = "file://%s" %loc
1466            try:
1467                u = urlopen(loc)
1468            except Exception, e:
1469                raise service_error(service_error.req, 
1470                        "Cannot open %s: %s" % (loc, e))
1471            try:
1472                f = open("%s/%s" % (softdir, dest) , "w")
1473                self.log.debug("Writing %s/%s" % (softdir,dest) )
1474                data = u.read(4096)
1475                while data:
1476                    f.write(data)
1477                    data = u.read(4096)
1478                f.close()
1479                u.close()
1480            except Exception, e:
1481                raise service_error(service_error.internal,
1482                        "Could not copy %s: %s" % (loc, e))
1483            path = re.sub("/tmp", "", linkpath)
1484            # XXX
1485            softmap[pkg] = \
1486                    "%s/%s/%s" %\
1487                    ( self.repo_url, path, dest)
1488
1489            # Allow the individual segments to access the software.
1490            for tb in tbparams.keys():
1491                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1492                        "/%s/%s" % ( path, dest))
1493
1494        # Convert the software locations in the segments into the local
1495        # copies on this host
1496        for soft in [ s for tb in topo.values() \
1497                for e in tb.elements \
1498                    if getattr(e, 'software', False) \
1499                        for s in e.software ]:
1500            if softmap.has_key(soft.location):
1501                soft.location = softmap[soft.location]
1502
1503
1504    def new_experiment(self, req, fid):
1505        """
1506        The external interface to empty initial experiment creation called from
1507        the dispatcher.
1508
1509        Creates a working directory, splits the incoming description using the
1510        splitter script and parses out the avrious subsections using the
1511        lcasses above.  Once each sub-experiment is created, use pooled threads
1512        to instantiate them and start it all up.
1513        """
1514        if not self.auth.check_attribute(fid, 'new'):
1515            raise service_error(service_error.access, "New access denied")
1516
1517        try:
1518            tmpdir = tempfile.mkdtemp(prefix="split-")
1519        except IOError:
1520            raise service_error(service_error.internal, "Cannot create tmp dir")
1521
1522        try:
1523            access_user = self.accessdb[fid]
1524        except KeyError:
1525            raise service_error(service_error.internal,
1526                    "Access map and authorizer out of sync in " + \
1527                            "new_experiment for fedid %s"  % fid)
1528
1529        pid = "dummy"
1530        gid = "dummy"
1531
1532        req = req.get('NewRequestBody', None)
1533        if not req:
1534            raise service_error(service_error.req,
1535                    "Bad request format (no NewRequestBody)")
1536
1537        # Generate an ID for the experiment (slice) and a certificate that the
1538        # allocator can use to prove they own it.  We'll ship it back through
1539        # the encrypted connection.
1540        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1541
1542        #now we're done with the tmpdir, and it should be empty
1543        if self.cleanup:
1544            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1545            os.rmdir(tmpdir)
1546        else:
1547            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1548
1549        eid = self.create_experiment_state(fid, req, expid, expcert, 
1550                state='empty')
1551
1552        # Let users touch the state
1553        self.auth.set_attribute(fid, expid)
1554        self.auth.set_attribute(expid, expid)
1555        # Override fedids can manipulate state as well
1556        for o in self.overrides:
1557            self.auth.set_attribute(o, expid)
1558
1559        rv = {
1560                'experimentID': [
1561                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1562                ],
1563                'experimentStatus': 'empty',
1564                'experimentAccess': { 'X509' : expcert }
1565            }
1566
1567        return rv
1568
1569    def create_experiment(self, req, fid):
1570        """
1571        The external interface to experiment creation called from the
1572        dispatcher.
1573
1574        Creates a working directory, splits the incoming description using the
1575        splitter script and parses out the various subsections using the
1576        classes above.  Once each sub-experiment is created, use pooled threads
1577        to instantiate them and start it all up.
1578        """
1579
1580        req = req.get('CreateRequestBody', None)
1581        if not req:
1582            raise service_error(service_error.req,
1583                    "Bad request format (no CreateRequestBody)")
1584
1585        # Get the experiment access
1586        exp = req.get('experimentID', None)
1587        if exp:
1588            if exp.has_key('fedid'):
1589                key = exp['fedid']
1590                expid = key
1591                eid = None
1592            elif exp.has_key('localname'):
1593                key = exp['localname']
1594                eid = key
1595                expid = None
1596            else:
1597                raise service_error(service_error.req, "Unknown lookup type")
1598        else:
1599            raise service_error(service_error.req, "No request?")
1600
1601        self.check_experiment_access(fid, key)
1602
1603        try:
1604            tmpdir = tempfile.mkdtemp(prefix="split-")
1605            os.mkdir(tmpdir+"/keys")
1606        except IOError:
1607            raise service_error(service_error.internal, "Cannot create tmp dir")
1608
1609        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1610        gw_secretkey_base = "fed.%s" % self.ssh_type
1611        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1612        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1613        tclfile = tmpdir + "/experiment.tcl"
1614        tbparams = { }
1615        try:
1616            access_user = self.accessdb[fid]
1617        except KeyError:
1618            raise service_error(service_error.internal,
1619                    "Access map and authorizer out of sync in " + \
1620                            "create_experiment for fedid %s"  % fid)
1621
1622        pid = "dummy"
1623        gid = "dummy"
1624
1625        # The tcl parser needs to read a file so put the content into that file
1626        descr=req.get('experimentdescription', None)
1627        if descr:
1628            file_content=descr.get('ns2description', None)
1629            if file_content:
1630                try:
1631                    f = open(tclfile, 'w')
1632                    f.write(file_content)
1633                    f.close()
1634                except IOError:
1635                    raise service_error(service_error.internal,
1636                            "Cannot write temp experiment description")
1637            else:
1638                raise service_error(service_error.req, 
1639                        "Only ns2descriptions supported")
1640        else:
1641            raise service_error(service_error.req, "No experiment description")
1642
1643        self.state_lock.acquire()
1644        if self.state.has_key(key):
1645            self.state[key]['experimentStatus'] = "starting"
1646            for e in self.state[key].get('experimentID',[]):
1647                if not expid and e.has_key('fedid'):
1648                    expid = e['fedid']
1649                elif not eid and e.has_key('localname'):
1650                    eid = e['localname']
1651        self.state_lock.release()
1652
1653        if not (eid and expid):
1654            raise service_error(service_error.internal, 
1655                    "Cannot find local experiment info!?")
1656
1657        try: 
1658            # This catches exceptions to clear the placeholder if necessary
1659            try:
1660                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1661            except ValueError:
1662                raise service_error(service_error.server_config, 
1663                        "Bad key type (%s)" % self.ssh_type)
1664
1665            # Copy the service request
1666            tb_services = [ s for s in req.get('service',[]) ]
1667            # Translate to topdl
1668            if self.splitter_url:
1669                self.log.debug("Calling remote topdl translator at %s" % \
1670                        self.splitter_url)
1671                top = self.remote_ns2topdl(self.splitter_url, file_content)
1672            else:
1673                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1674                    str(self.muxmax), '-m', 'dummy']
1675
1676                tclcmd.extend([pid, gid, eid, tclfile])
1677
1678                self.log.debug("running local splitter %s", " ".join(tclcmd))
1679                # This is just fantastic.  As a side effect the parser copies
1680                # tb_compat.tcl into the current directory, so that directory
1681                # must be writable by the fedd user.  Doing this in the
1682                # temporary subdir ensures this is the case.
1683                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1684                        cwd=tmpdir)
1685                split_data = tclparser.stdout
1686
1687                top = topdl.topology_from_xml(file=split_data, top="experiment")
1688
1689            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1690             # Find the testbeds to look up
1691            testbeds = set([ a.value for e in top.elements \
1692                    for a in e.attribute \
1693                        if a.attribute == 'testbed'])
1694
1695            masters = { }           # testbeds exporting services
1696            for s in tb_services:
1697                # If this is a project_export request with the importall field
1698                # set, fill it out.
1699
1700                if s.get('importall', False):
1701                    s['import'] = [ tb for tb in testbeds \
1702                            if tb not in s.get('export',[])]
1703                    del s['importall']
1704
1705                # Add the service to masters
1706                for tb in s.get('export', []):
1707                    if s.get('name', None):
1708                        if tb not in masters:
1709                            masters[tb] = [ ]
1710
1711                        params = { }
1712                        if 'fedAttr' in s:
1713                            for a in s['fedAttr']:
1714                                params[a.get('attribute', '')] = \
1715                                        a.get('value','')
1716
1717                        masters[tb].append(federated_service(name=s['name'],
1718                                exporter=tb, importers=s.get('import',[]),
1719                                params=params, reqs=[]))
1720                    else:
1721                        self.log.error('Testbed service does not have name " + \
1722                                "and importers')
1723
1724
1725            allocated = { }         # Testbeds we can access
1726            topo ={ }               # Sub topologies
1727            connInfo = { }          # Connection information
1728            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1729                    tbparams, masters)
1730
1731            self.split_topology(top, topo, testbeds)
1732
1733            # Copy configuration files into the remote file store
1734            # The config urlpath
1735            configpath = "/%s/config" % expid
1736            # The config file system location
1737            configdir ="%s%s" % ( self.repodir, configpath)
1738            try:
1739                os.makedirs(configdir)
1740            except EnvironmentError, e:
1741                raise service_error(service_error.internal,
1742                        "Cannot create config directory: %s" % e)
1743            try:
1744                f = open("%s/hosts" % configdir, "w")
1745                f.write('\n'.join(hosts))
1746                f.close()
1747            except IOError, e:
1748                raise service_error(service_error.internal, 
1749                        "Cannot write hosts file: %s" % e)
1750            try:
1751                copy_file("%s" % gw_pubkey, "%s/%s" % \
1752                        (configdir, gw_pubkey_base))
1753                copy_file("%s" % gw_secretkey, "%s/%s" % \
1754                        (configdir, gw_secretkey_base))
1755            except IOError, e:
1756                raise service_error(service_error.internal, 
1757                        "Cannot copy keyfiles: %s" % e)
1758
1759            # Allow the individual testbeds to access the configuration files.
1760            for tb in tbparams.keys():
1761                asignee = tbparams[tb]['allocID']['fedid']
1762                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1763                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1764
1765            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1766                    self.muxmax)
1767            part.add_portals(top, topo, eid, masters, tbparams, ip_allocator,
1768                    connInfo, expid)
1769            # Now get access to the dynamic testbeds (those added above)
1770            for tb in [ t for t in topo if t not in allocated]:
1771                self.get_access(tb, None, tbparams, access_user, masters)
1772                allocated[tb] = 1
1773                store_keys = topo[tb].get_attribute('store_keys')
1774                # Give the testbed access to keys it exports or imports
1775                if store_keys:
1776                    for sk in store_keys.split(" "):
1777                        self.auth.set_attribute(\
1778                                tbparams[tb]['allocID']['fedid'], sk)
1779
1780            self.wrangle_software(expid, top, topo, tbparams)
1781
1782            vtopo = topdl.topology_to_vtopo(top)
1783            vis = self.genviz(vtopo)
1784
1785            # save federant information
1786            for k in allocated.keys():
1787                tbparams[k]['federant'] = {
1788                        'name': [ { 'localname' : eid} ],
1789                        'allocID' : tbparams[k]['allocID'],
1790                        'uri': tbparams[k]['uri'],
1791                    }
1792
1793            self.state_lock.acquire()
1794            self.state[eid]['vtopo'] = vtopo
1795            self.state[eid]['vis'] = vis
1796            self.state[eid]['experimentdescription'] = \
1797                    { 'topdldescription': top.clone() }
1798            self.state[expid]['federant'] = \
1799                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1800                        if tbparams[tb].has_key('federant') ]
1801            if self.state_filename: 
1802                self.write_state()
1803            self.state_lock.release()
1804        except service_error, e:
1805            # If something goes wrong in the parse (usually an access error)
1806            # clear the placeholder state.  From here on out the code delays
1807            # exceptions.  Failing at this point returns a fault to the remote
1808            # caller.
1809
1810            self.state_lock.acquire()
1811            del self.state[eid]
1812            del self.state[expid]
1813            if self.state_filename: self.write_state()
1814            self.state_lock.release()
1815            raise e
1816
1817
1818        # Start the background swapper and return the starting state.  From
1819        # here on out, the state will stick around a while.
1820
1821        # Let users touch the state
1822        self.auth.set_attribute(fid, expid)
1823        self.auth.set_attribute(expid, expid)
1824        # Override fedids can manipulate state as well
1825        for o in self.overrides:
1826            self.auth.set_attribute(o, expid)
1827
1828        # Create a logger that logs to the experiment's state object as well as
1829        # to the main log file.
1830        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1831        alloc_collector = self.list_log(self.state[eid]['log'])
1832        h = logging.StreamHandler(alloc_collector)
1833        # XXX: there should be a global one of these rather than repeating the
1834        # code.
1835        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1836                    '%d %b %y %H:%M:%S'))
1837        alloc_log.addHandler(h)
1838       
1839        attrs = [ 
1840                {
1841                    'attribute': 'ssh_pubkey', 
1842                    'value': '%s/%s/config/%s' % \
1843                            (self.repo_url, expid, gw_pubkey_base)
1844                },
1845                {
1846                    'attribute': 'ssh_secretkey', 
1847                    'value': '%s/%s/config/%s' % \
1848                            (self.repo_url, expid, gw_secretkey_base)
1849                },
1850                {
1851                    'attribute': 'hosts', 
1852                    'value': '%s/%s/config/hosts' % \
1853                            (self.repo_url, expid)
1854                },
1855            ]
1856
1857        # transit and disconnected testbeds may not have a connInfo entry.
1858        # Fill in the blanks.
1859        for t in allocated.keys():
1860            if not connInfo.has_key(t):
1861                connInfo[t] = { }
1862
1863        # Start a thread to do the resource allocation
1864        t  = Thread(target=self.allocate_resources,
1865                args=(allocated, masters, eid, expid, tbparams, 
1866                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1867                    connInfo),
1868                name=eid)
1869        t.start()
1870
1871        rv = {
1872                'experimentID': [
1873                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1874                ],
1875                'experimentStatus': 'starting',
1876            }
1877
1878        return rv
1879   
1880    def get_experiment_fedid(self, key):
1881        """
1882        find the fedid associated with the localname key in the state database.
1883        """
1884
1885        rv = None
1886        self.state_lock.acquire()
1887        if self.state.has_key(key):
1888            if isinstance(self.state[key], dict):
1889                try:
1890                    kl = [ f['fedid'] for f in \
1891                            self.state[key]['experimentID']\
1892                                if f.has_key('fedid') ]
1893                except KeyError:
1894                    self.state_lock.release()
1895                    raise service_error(service_error.internal, 
1896                            "No fedid for experiment %s when getting "+\
1897                                    "fedid(!?)" % key)
1898                if len(kl) == 1:
1899                    rv = kl[0]
1900                else:
1901                    self.state_lock.release()
1902                    raise service_error(service_error.internal, 
1903                            "multiple fedids for experiment %s when " +\
1904                                    "getting fedid(!?)" % key)
1905            else:
1906                self.state_lock.release()
1907                raise service_error(service_error.internal, 
1908                        "Unexpected state for %s" % key)
1909        self.state_lock.release()
1910        return rv
1911
1912    def check_experiment_access(self, fid, key):
1913        """
1914        Confirm that the fid has access to the experiment.  Though a request
1915        may be made in terms of a local name, the access attribute is always
1916        the experiment's fedid.
1917        """
1918        if not isinstance(key, fedid):
1919            key = self.get_experiment_fedid(key)
1920
1921        if self.auth.check_attribute(fid, key):
1922            return True
1923        else:
1924            raise service_error(service_error.access, "Access Denied")
1925
1926
1927    def get_handler(self, path, fid):
1928        self.log.info("Get handler %s %s" % (path, fid))
1929        if self.auth.check_attribute(fid, path):
1930            return ("%s/%s" % (self.repodir, path), "application/binary")
1931        else:
1932            return (None, None)
1933
1934    def get_vtopo(self, req, fid):
1935        """
1936        Return the stored virtual topology for this experiment
1937        """
1938        rv = None
1939        state = None
1940
1941        req = req.get('VtopoRequestBody', None)
1942        if not req:
1943            raise service_error(service_error.req,
1944                    "Bad request format (no VtopoRequestBody)")
1945        exp = req.get('experiment', None)
1946        if exp:
1947            if exp.has_key('fedid'):
1948                key = exp['fedid']
1949                keytype = "fedid"
1950            elif exp.has_key('localname'):
1951                key = exp['localname']
1952                keytype = "localname"
1953            else:
1954                raise service_error(service_error.req, "Unknown lookup type")
1955        else:
1956            raise service_error(service_error.req, "No request?")
1957
1958        self.check_experiment_access(fid, key)
1959
1960        self.state_lock.acquire()
1961        if self.state.has_key(key):
1962            if self.state[key].has_key('vtopo'):
1963                rv = { 'experiment' : {keytype: key },\
1964                        'vtopo': self.state[key]['vtopo'],\
1965                    }
1966            else:
1967                state = self.state[key]['experimentStatus']
1968        self.state_lock.release()
1969
1970        if rv: return rv
1971        else: 
1972            if state:
1973                raise service_error(service_error.partial, 
1974                        "Not ready: %s" % state)
1975            else:
1976                raise service_error(service_error.req, "No such experiment")
1977
1978    def get_vis(self, req, fid):
1979        """
1980        Return the stored visualization for this experiment
1981        """
1982        rv = None
1983        state = None
1984
1985        req = req.get('VisRequestBody', None)
1986        if not req:
1987            raise service_error(service_error.req,
1988                    "Bad request format (no VisRequestBody)")
1989        exp = req.get('experiment', None)
1990        if exp:
1991            if exp.has_key('fedid'):
1992                key = exp['fedid']
1993                keytype = "fedid"
1994            elif exp.has_key('localname'):
1995                key = exp['localname']
1996                keytype = "localname"
1997            else:
1998                raise service_error(service_error.req, "Unknown lookup type")
1999        else:
2000            raise service_error(service_error.req, "No request?")
2001
2002        self.check_experiment_access(fid, key)
2003
2004        self.state_lock.acquire()
2005        if self.state.has_key(key):
2006            if self.state[key].has_key('vis'):
2007                rv =  { 'experiment' : {keytype: key },\
2008                        'vis': self.state[key]['vis'],\
2009                        }
2010            else:
2011                state = self.state[key]['experimentStatus']
2012        self.state_lock.release()
2013
2014        if rv: return rv
2015        else:
2016            if state:
2017                raise service_error(service_error.partial, 
2018                        "Not ready: %s" % state)
2019            else:
2020                raise service_error(service_error.req, "No such experiment")
2021
2022    def clean_info_response(self, rv):
2023        """
2024        Remove the information in the experiment's state object that is not in
2025        the info response.
2026        """
2027        # Remove the owner info (should always be there, but...)
2028        if rv.has_key('owner'): del rv['owner']
2029
2030        # Convert the log into the allocationLog parameter and remove the
2031        # log entry (with defensive programming)
2032        if rv.has_key('log'):
2033            rv['allocationLog'] = "".join(rv['log'])
2034            del rv['log']
2035        else:
2036            rv['allocationLog'] = ""
2037
2038        if rv['experimentStatus'] != 'active':
2039            if rv.has_key('federant'): del rv['federant']
2040        else:
2041            # remove the allocationID and uri info from each federant
2042            for f in rv.get('federant', []):
2043                if f.has_key('allocID'): del f['allocID']
2044                if f.has_key('uri'): del f['uri']
2045
2046        return rv
2047
2048    def get_info(self, req, fid):
2049        """
2050        Return all the stored info about this experiment
2051        """
2052        rv = None
2053
2054        req = req.get('InfoRequestBody', None)
2055        if not req:
2056            raise service_error(service_error.req,
2057                    "Bad request format (no InfoRequestBody)")
2058        exp = req.get('experiment', None)
2059        if exp:
2060            if exp.has_key('fedid'):
2061                key = exp['fedid']
2062                keytype = "fedid"
2063            elif exp.has_key('localname'):
2064                key = exp['localname']
2065                keytype = "localname"
2066            else:
2067                raise service_error(service_error.req, "Unknown lookup type")
2068        else:
2069            raise service_error(service_error.req, "No request?")
2070
2071        self.check_experiment_access(fid, key)
2072
2073        # The state may be massaged by the service function that called
2074        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2075        # state.
2076        self.state_lock.acquire()
2077        if self.state.has_key(key):
2078            rv = copy.deepcopy(self.state[key])
2079        self.state_lock.release()
2080
2081        if rv:
2082            return self.clean_info_response(rv)
2083        else:
2084            raise service_error(service_error.req, "No such experiment")
2085
2086    def get_multi_info(self, req, fid):
2087        """
2088        Return all the stored info that this fedid can access
2089        """
2090        rv = { 'info': [ ] }
2091
2092        self.state_lock.acquire()
2093        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2094            try:
2095                self.check_experiment_access(fid, key)
2096            except service_error, e:
2097                if e.code == service_error.access:
2098                    continue
2099                else:
2100                    self.state_lock.release()
2101                    raise e
2102
2103            if self.state.has_key(key):
2104                e = copy.deepcopy(self.state[key])
2105                e = self.clean_info_response(e)
2106                rv['info'].append(e)
2107        self.state_lock.release()
2108        return rv
2109
2110    def terminate_experiment(self, req, fid):
2111        """
2112        Swap this experiment out on the federants and delete the shared
2113        information
2114        """
2115        tbparams = { }
2116        req = req.get('TerminateRequestBody', None)
2117        if not req:
2118            raise service_error(service_error.req,
2119                    "Bad request format (no TerminateRequestBody)")
2120        force = req.get('force', False)
2121        exp = req.get('experiment', None)
2122        if exp:
2123            if exp.has_key('fedid'):
2124                key = exp['fedid']
2125                keytype = "fedid"
2126            elif exp.has_key('localname'):
2127                key = exp['localname']
2128                keytype = "localname"
2129            else:
2130                raise service_error(service_error.req, "Unknown lookup type")
2131        else:
2132            raise service_error(service_error.req, "No request?")
2133
2134        self.check_experiment_access(fid, key)
2135
2136        dealloc_list = [ ]
2137
2138
2139        # Create a logger that logs to the dealloc_list as well as to the main
2140        # log file.
2141        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2142        h = logging.StreamHandler(self.list_log(dealloc_list))
2143        # XXX: there should be a global one of these rather than repeating the
2144        # code.
2145        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2146                    '%d %b %y %H:%M:%S'))
2147        dealloc_log.addHandler(h)
2148
2149        self.state_lock.acquire()
2150        fed_exp = self.state.get(key, None)
2151
2152        if fed_exp:
2153            # This branch of the conditional holds the lock to generate a
2154            # consistent temporary tbparams variable to deallocate experiments.
2155            # It releases the lock to do the deallocations and reacquires it to
2156            # remove the experiment state when the termination is complete.
2157
2158            # First make sure that the experiment creation is complete.
2159            status = fed_exp.get('experimentStatus', None)
2160
2161            if status:
2162                if status in ('starting', 'terminating'):
2163                    if not force:
2164                        self.state_lock.release()
2165                        raise service_error(service_error.partial, 
2166                                'Experiment still being created or destroyed')
2167                    else:
2168                        self.log.warning('Experiment in %s state ' % status + \
2169                                'being terminated by force.')
2170            else:
2171                # No status??? trouble
2172                self.state_lock.release()
2173                raise service_error(service_error.internal,
2174                        "Experiment has no status!?")
2175
2176            ids = []
2177            #  experimentID is a list of dicts that are self-describing
2178            #  identifiers.  This finds all the fedids and localnames - the
2179            #  keys of self.state - and puts them into ids.
2180            for id in fed_exp.get('experimentID', []):
2181                if id.has_key('fedid'): ids.append(id['fedid'])
2182                if id.has_key('localname'): ids.append(id['localname'])
2183
2184            # Collect the allocation/segment ids into a dict keyed by the fedid
2185            # of the allocation (or a monotonically increasing integer) that
2186            # contains a tuple of uri, aid (which is a dict...)
2187            for i, fed in enumerate(fed_exp.get('federant', [])):
2188                try:
2189                    uri = fed['uri']
2190                    aid = fed['allocID']
2191                    k = fed['allocID'].get('fedid', i)
2192                except KeyError, e:
2193                    continue
2194                tbparams[k] = (uri, aid)
2195            fed_exp['experimentStatus'] = 'terminating'
2196            if self.state_filename: self.write_state()
2197            self.state_lock.release()
2198
2199            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2200            # then completes, so we can't wait if nothing starts.  So, no
2201            # tbparams, no start.
2202            if len(tbparams) > 0:
2203                thread_pool = self.thread_pool(self.nthreads)
2204                for k in tbparams.keys():
2205                    # Create and start a thread to stop the segment
2206                    thread_pool.wait_for_slot()
2207                    uri, aid = tbparams[k]
2208                    t  = self.pooled_thread(\
2209                            target=self.terminate_segment(log=dealloc_log,
2210                                testbed=uri,
2211                                cert_file=self.cert_file, 
2212                                cert_pwd=self.cert_pwd,
2213                                trusted_certs=self.trusted_certs,
2214                                caller=self.call_TerminateSegment),
2215                            args=(uri, aid), name=k,
2216                            pdata=thread_pool, trace_file=self.trace_file)
2217                    t.start()
2218                # Wait for completions
2219                thread_pool.wait_for_all_done()
2220
2221            # release the allocations (failed experiments have done this
2222            # already, and starting experiments may be in odd states, so we
2223            # ignore errors releasing those allocations
2224            try: 
2225                for k in tbparams.keys():
2226                    # This releases access by uri
2227                    uri, aid = tbparams[k]
2228                    self.release_access(None, aid, uri=uri)
2229            except service_error, e:
2230                if status != 'failed' and not force:
2231                    raise e
2232
2233            # Remove the terminated experiment
2234            self.state_lock.acquire()
2235            for id in ids:
2236                if self.state.has_key(id): del self.state[id]
2237
2238            if self.state_filename: self.write_state()
2239            self.state_lock.release()
2240
2241            # Delete any synch points associated with this experiment.  All
2242            # synch points begin with the fedid of the experiment.
2243            fedid_keys = set(["fedid:%s" % f for f in ids \
2244                    if isinstance(f, fedid)])
2245            for k in self.synch_store.all_keys():
2246                try:
2247                    if len(k) > 45 and k[0:46] in fedid_keys:
2248                        self.synch_store.del_value(k)
2249                except synch_store.BadDeletionError:
2250                    pass
2251            self.write_store()
2252               
2253            return { 
2254                    'experiment': exp , 
2255                    'deallocationLog': "".join(dealloc_list),
2256                    }
2257        else:
2258            # Don't forget to release the lock
2259            self.state_lock.release()
2260            raise service_error(service_error.req, "No saved state")
2261
2262
2263    def GetValue(self, req, fid):
2264        """
2265        Get a value from the synchronized store
2266        """
2267        req = req.get('GetValueRequestBody', None)
2268        if not req:
2269            raise service_error(service_error.req,
2270                    "Bad request format (no GetValueRequestBody)")
2271       
2272        name = req['name']
2273        wait = req['wait']
2274        rv = { 'name': name }
2275
2276        if self.auth.check_attribute(fid, name):
2277            self.log.debug("[GetValue] asking for %s " % name)
2278            try:
2279                v = self.synch_store.get_value(name, wait)
2280            except synch_store.RevokedKeyError:
2281                # No more synch on this key
2282                raise service_error(service_error.federant, 
2283                        "Synch key %s revoked" % name)
2284            if v is not None:
2285                rv['value'] = v
2286            self.log.debug("[GetValue] got %s from %s" % (v, name))
2287            return rv
2288        else:
2289            raise service_error(service_error.access, "Access Denied")
2290       
2291
2292    def SetValue(self, req, fid):
2293        """
2294        Set a value in the synchronized store
2295        """
2296        req = req.get('SetValueRequestBody', None)
2297        if not req:
2298            raise service_error(service_error.req,
2299                    "Bad request format (no SetValueRequestBody)")
2300       
2301        name = req['name']
2302        v = req['value']
2303
2304        if self.auth.check_attribute(fid, name):
2305            try:
2306                self.synch_store.set_value(name, v)
2307                self.write_store()
2308                self.log.debug("[SetValue] set %s to %s" % (name, v))
2309            except synch_store.CollisionError:
2310                # Translate into a service_error
2311                raise service_error(service_error.req,
2312                        "Value already set: %s" %name)
2313            except synch_store.RevokedKeyError:
2314                # No more synch on this key
2315                raise service_error(service_error.federant, 
2316                        "Synch key %s revoked" % name)
2317            return { 'name': name, 'value': v }
2318        else:
2319            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.