source: fedd/federation/experiment_control.py @ f432e51

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since f432e51 was c5869ef, checked in by Ted Faber <faber@…>, 15 years ago

checkpoint and debugging

  • Property mode set to 100644
File size: 81.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=[], params={ }, reqs=[]):
49        self.name=name
50        self.exporter=exporter
51        self.importers=importers
52        self.params = params
53        self.reqs = reqs
54
55    def __str__(self):
56        return "name %s export %s import %s params %s reqs %s" % \
57                (self.name, self.exporter, self.importers, self.params,
58                        [ (r['name'], r['visibility']) for r in self.reqs] )
59
60class experiment_control_local:
61    """
62    Control of experiments that this system can directly access.
63
64    Includes experiment creation, termination and information dissemination.
65    Thred safe.
66    """
67
68    class ssh_cmd_timeout(RuntimeError): pass
69   
70    class thread_pool:
71        """
72        A class to keep track of a set of threads all invoked for the same
73        task.  Manages the mutual exclusion of the states.
74        """
75        def __init__(self, nthreads):
76            """
77            Start a pool.
78            """
79            self.changed = Condition()
80            self.started = 0
81            self.terminated = 0
82            self.nthreads = nthreads
83
84        def acquire(self):
85            """
86            Get the pool's lock.
87            """
88            self.changed.acquire()
89
90        def release(self):
91            """
92            Release the pool's lock.
93            """
94            self.changed.release()
95
96        def wait(self, timeout = None):
97            """
98            Wait for a pool thread to start or stop.
99            """
100            self.changed.wait(timeout)
101
102        def start(self):
103            """
104            Called by a pool thread to report starting.
105            """
106            self.changed.acquire()
107            self.started += 1
108            self.changed.notifyAll()
109            self.changed.release()
110
111        def terminate(self):
112            """
113            Called by a pool thread to report finishing.
114            """
115            self.changed.acquire()
116            self.terminated += 1
117            self.changed.notifyAll()
118            self.changed.release()
119
120        def clear(self):
121            """
122            Clear all pool data.
123            """
124            self.changed.acquire()
125            self.started = 0
126            self.terminated =0
127            self.changed.notifyAll()
128            self.changed.release()
129
130        def wait_for_slot(self):
131            """
132            Wait until we have a free slot to start another pooled thread
133            """
134            self.acquire()
135            while self.started - self.terminated >= self.nthreads:
136                self.wait()
137            self.release()
138
139        def wait_for_all_done(self, timeout=None):
140            """
141            Wait until all active threads finish (and at least one has
142            started).  If a timeout is given, return after waiting that long
143            for termination.  If all threads are done (and one has started in
144            the since the last clear()) return True, otherwise False.
145            """
146            if timeout:
147                deadline = time.time() + timeout
148            self.acquire()
149            while self.started == 0 or self.started > self.terminated:
150                self.wait(timeout)
151                if timeout:
152                    if time.time() > deadline:
153                        break
154                    timeout = deadline - time.time()
155            self.release()
156            return not (self.started == 0 or self.started > self.terminated)
157
158    class pooled_thread(Thread):
159        """
160        One of a set of threads dedicated to a specific task.  Uses the
161        thread_pool class above for coordination.
162        """
163        def __init__(self, group=None, target=None, name=None, args=(), 
164                kwargs={}, pdata=None, trace_file=None):
165            Thread.__init__(self, group, target, name, args, kwargs)
166            self.rv = None          # Return value of the ops in this thread
167            self.exception = None   # Exception that terminated this thread
168            self.target=target      # Target function to run on start()
169            self.args = args        # Args to pass to target
170            self.kwargs = kwargs    # Additional kw args
171            self.pdata = pdata      # thread_pool for this class
172            # Logger for this thread
173            self.log = logging.getLogger("fedd.experiment_control")
174       
175        def run(self):
176            """
177            Emulate Thread.run, except add pool data manipulation and error
178            logging.
179            """
180            if self.pdata:
181                self.pdata.start()
182
183            if self.target:
184                try:
185                    self.rv = self.target(*self.args, **self.kwargs)
186                except service_error, s:
187                    self.exception = s
188                    self.log.error("Thread exception: %s %s" % \
189                            (s.code_string(), s.desc))
190                except:
191                    self.exception = sys.exc_info()[1]
192                    self.log.error(("Unexpected thread exception: %s" +\
193                            "Trace %s") % (self.exception,\
194                                traceback.format_exc()))
195            if self.pdata:
196                self.pdata.terminate()
197
198    call_RequestAccess = service_caller('RequestAccess')
199    call_ReleaseAccess = service_caller('ReleaseAccess')
200    call_StartSegment = service_caller('StartSegment')
201    call_TerminateSegment = service_caller('TerminateSegment')
202    call_Ns2Topdl = service_caller('Ns2Topdl')
203
204    def __init__(self, config=None, auth=None):
205        """
206        Intialize the various attributes, most from the config object
207        """
208
209        def parse_tarfile_list(tf):
210            """
211            Parse a tarfile list from the configuration.  This is a set of
212            paths and tarfiles separated by spaces.
213            """
214            rv = [ ]
215            if tf is not None:
216                tl = tf.split()
217                while len(tl) > 1:
218                    p, t = tl[0:2]
219                    del tl[0:2]
220                    rv.append((p, t))
221            return rv
222
223        self.thread_with_rv = experiment_control_local.pooled_thread
224        self.thread_pool = experiment_control_local.thread_pool
225        self.list_log = list_log.list_log
226
227        self.cert_file = config.get("experiment_control", "cert_file")
228        if self.cert_file:
229            self.cert_pwd = config.get("experiment_control", "cert_pwd")
230        else:
231            self.cert_file = config.get("globals", "cert_file")
232            self.cert_pwd = config.get("globals", "cert_pwd")
233
234        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
235                or config.get("globals", "trusted_certs")
236
237        self.repodir = config.get("experiment_control", "repodir")
238        self.repo_url = config.get("experiment_control", "repo_url", 
239                "https://users.isi.deterlab.net:23235");
240
241        self.exp_stem = "fed-stem"
242        self.log = logging.getLogger("fedd.experiment_control")
243        set_log_level(config, "experiment_control", self.log)
244        self.muxmax = 2
245        self.nthreads = 10
246        self.randomize_experiments = False
247
248        self.splitter = None
249        self.ssh_keygen = "/usr/bin/ssh-keygen"
250        self.ssh_identity_file = None
251
252
253        self.debug = config.getboolean("experiment_control", "create_debug")
254        self.cleanup = not config.getboolean("experiment_control", 
255                "leave_tmpfiles")
256        self.state_filename = config.get("experiment_control", 
257                "experiment_state")
258        self.store_filename = config.get("experiment_control", 
259                "synch_store")
260        self.store_url = config.get("experiment_control", "store_url")
261        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
262        self.fedkit = parse_tarfile_list(\
263                config.get("experiment_control", "fedkit"))
264        self.gatewaykit = parse_tarfile_list(\
265                config.get("experiment_control", "gatewaykit"))
266        accessdb_file = config.get("experiment_control", "accessdb")
267
268        self.ssh_pubkey_file = config.get("experiment_control", 
269                "ssh_pubkey_file")
270        self.ssh_privkey_file = config.get("experiment_control",
271                "ssh_privkey_file")
272        dt = config.get("experiment_control", "direct_transit")
273        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
274        else: self.direct_transit = [ ]
275        # NB for internal master/slave ops, not experiment setup
276        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
277
278        self.overrides = set([])
279        ovr = config.get('experiment_control', 'overrides')
280        if ovr:
281            for o in ovr.split(","):
282                o = o.strip()
283                if o.startswith('fedid:'): o = o[len('fedid:'):]
284                self.overrides.add(fedid(hexstr=o))
285
286        self.state = { }
287        self.state_lock = Lock()
288        self.tclsh = "/usr/local/bin/otclsh"
289        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
290                config.get("experiment_control", "tcl_splitter",
291                        "/usr/testbed/lib/ns2ir/parse.tcl")
292        mapdb_file = config.get("experiment_control", "mapdb")
293        self.trace_file = sys.stderr
294
295        self.def_expstart = \
296                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
297                "/tmp/federate";
298        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
299                "FEDDIR/hosts";
300        self.def_gwstart = \
301                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
302                "/tmp/bridge.log";
303        self.def_mgwstart = \
304                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
305                "/tmp/bridge.log";
306        self.def_gwimage = "FBSD61-TUNNEL2";
307        self.def_gwtype = "pc";
308        self.local_access = { }
309
310        if auth:
311            self.auth = auth
312        else:
313            self.log.error(\
314                    "[access]: No authorizer initialized, creating local one.")
315            auth = authorizer()
316
317
318        if self.ssh_pubkey_file:
319            try:
320                f = open(self.ssh_pubkey_file, 'r')
321                self.ssh_pubkey = f.read()
322                f.close()
323            except EnvironmentError:
324                raise service_error(service_error.internal,
325                        "Cannot read sshpubkey")
326        else:
327            raise service_error(service_error.internal, 
328                    "No SSH public key file?")
329
330        if not self.ssh_privkey_file:
331            raise service_error(service_error.internal, 
332                    "No SSH public key file?")
333
334
335        if mapdb_file:
336            self.read_mapdb(mapdb_file)
337        else:
338            self.log.warn("[experiment_control] No testbed map, using defaults")
339            self.tbmap = { 
340                    'deter':'https://users.isi.deterlab.net:23235',
341                    'emulab':'https://users.isi.deterlab.net:23236',
342                    'ucb':'https://users.isi.deterlab.net:23237',
343                    }
344
345        if accessdb_file:
346                self.read_accessdb(accessdb_file)
347        else:
348            raise service_error(service_error.internal,
349                    "No accessdb specified in config")
350
351        # Grab saved state.  OK to do this w/o locking because it's read only
352        # and only one thread should be in existence that can see self.state at
353        # this point.
354        if self.state_filename:
355            self.read_state()
356
357        if self.store_filename:
358            self.read_store()
359        else:
360            self.log.warning("No saved synch store")
361            self.synch_store = synch_store
362
363        # Dispatch tables
364        self.soap_services = {\
365                'New': soap_handler('New', self.new_experiment),
366                'Create': soap_handler('Create', self.create_experiment),
367                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
368                'Vis': soap_handler('Vis', self.get_vis),
369                'Info': soap_handler('Info', self.get_info),
370                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
371                'Terminate': soap_handler('Terminate', 
372                    self.terminate_experiment),
373                'GetValue': soap_handler('GetValue', self.GetValue),
374                'SetValue': soap_handler('SetValue', self.SetValue),
375        }
376
377        self.xmlrpc_services = {\
378                'New': xmlrpc_handler('New', self.new_experiment),
379                'Create': xmlrpc_handler('Create', self.create_experiment),
380                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
381                'Vis': xmlrpc_handler('Vis', self.get_vis),
382                'Info': xmlrpc_handler('Info', self.get_info),
383                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
384                'Terminate': xmlrpc_handler('Terminate',
385                    self.terminate_experiment),
386                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
387                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
388        }
389
390    # Call while holding self.state_lock
391    def write_state(self):
392        """
393        Write a new copy of experiment state after copying the existing state
394        to a backup.
395
396        State format is a simple pickling of the state dictionary.
397        """
398        if os.access(self.state_filename, os.W_OK):
399            copy_file(self.state_filename, \
400                    "%s.bak" % self.state_filename)
401        try:
402            f = open(self.state_filename, 'w')
403            pickle.dump(self.state, f)
404        except EnvironmentError, e:
405            self.log.error("Can't write file %s: %s" % \
406                    (self.state_filename, e))
407        except pickle.PicklingError, e:
408            self.log.error("Pickling problem: %s" % e)
409        except TypeError, e:
410            self.log.error("Pickling problem (TypeError): %s" % e)
411
412    @staticmethod
413    def get_alloc_ids(state):
414        """
415        Pull the fedids of the identifiers of each allocation from the
416        state.  Again, a dict dive that's best isolated.
417
418        Used by read_store and read state
419        """
420
421        return [ f['allocID']['fedid'] 
422                for f in state.get('federant',[]) \
423                    if f.has_key('allocID') and \
424                        f['allocID'].has_key('fedid')]
425
426    # Call while holding self.state_lock
427    def read_state(self):
428        """
429        Read a new copy of experiment state.  Old state is overwritten.
430
431        State format is a simple pickling of the state dictionary.
432        """
433       
434        def get_experiment_id(state):
435            """
436            Pull the fedid experimentID out of the saved state.  This is kind
437            of a gross walk through the dict.
438            """
439
440            if state.has_key('experimentID'):
441                for e in state['experimentID']:
442                    if e.has_key('fedid'):
443                        return e['fedid']
444                else:
445                    return None
446            else:
447                return None
448
449        try:
450            f = open(self.state_filename, "r")
451            self.state = pickle.load(f)
452            self.log.debug("[read_state]: Read state from %s" % \
453                    self.state_filename)
454        except EnvironmentError, e:
455            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
456                    % (self.state_filename, e))
457        except pickle.UnpicklingError, e:
458            self.log.warning(("[read_state]: No saved state: " + \
459                    "Unpickling failed: %s") % e)
460       
461        for s in self.state.values():
462            try:
463
464                eid = get_experiment_id(s)
465                if eid : 
466                    # Give the owner rights to the experiment
467                    self.auth.set_attribute(s['owner'], eid)
468                    # And holders of the eid as well
469                    self.auth.set_attribute(eid, eid)
470                    # allow overrides to control experiments as well
471                    for o in self.overrides:
472                        self.auth.set_attribute(o, eid)
473                    # Set permissions to allow reading of the software repo, if
474                    # any, as well.
475                    for a in self.get_alloc_ids(s):
476                        self.auth.set_attribute(a, 'repo/%s' % eid)
477                else:
478                    raise KeyError("No experiment id")
479            except KeyError, e:
480                self.log.warning("[read_state]: State ownership or identity " +\
481                        "misformatted in %s: %s" % (self.state_filename, e))
482
483
484    def read_accessdb(self, accessdb_file):
485        """
486        Read the mapping from fedids that can create experiments to their name
487        in the 3-level access namespace.  All will be asserted from this
488        testbed and can include the local username and porject that will be
489        asserted on their behalf by this fedd.  Each fedid is also added to the
490        authorization system with the "create" attribute.
491        """
492        self.accessdb = {}
493        # These are the regexps for parsing the db
494        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
495        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
496                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
497        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
498                "\s*->\s*(" + name_expr + ")\s*$")
499        lineno = 0
500
501        # Parse the mappings and store in self.authdb, a dict of
502        # fedid -> (proj, user)
503        try:
504            f = open(accessdb_file, "r")
505            for line in f:
506                lineno += 1
507                line = line.strip()
508                if len(line) == 0 or line.startswith('#'):
509                    continue
510                m = project_line.match(line)
511                if m:
512                    fid = fedid(hexstr=m.group(1))
513                    project, user = m.group(2,3)
514                    if not self.accessdb.has_key(fid):
515                        self.accessdb[fid] = []
516                    self.accessdb[fid].append((project, user))
517                    continue
518
519                m = user_line.match(line)
520                if m:
521                    fid = fedid(hexstr=m.group(1))
522                    project = None
523                    user = m.group(2)
524                    if not self.accessdb.has_key(fid):
525                        self.accessdb[fid] = []
526                    self.accessdb[fid].append((project, user))
527                    continue
528                self.log.warn("[experiment_control] Error parsing access " +\
529                        "db %s at line %d" %  (accessdb_file, lineno))
530        except EnvironmentError:
531            raise service_error(service_error.internal,
532                    ("Error opening/reading %s as experiment " +\
533                            "control accessdb") %  accessdb_file)
534        f.close()
535
536        # Initialize the authorization attributes
537        for fid in self.accessdb.keys():
538            self.auth.set_attribute(fid, 'create')
539            self.auth.set_attribute(fid, 'new')
540
541    def read_mapdb(self, file):
542        """
543        Read a simple colon separated list of mappings for the
544        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
545        """
546
547        self.tbmap = { }
548        lineno =0
549        try:
550            f = open(file, "r")
551            for line in f:
552                lineno += 1
553                line = line.strip()
554                if line.startswith('#') or len(line) == 0:
555                    continue
556                try:
557                    label, url = line.split(':', 1)
558                    self.tbmap[label] = url
559                except ValueError, e:
560                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
561                            "map db: %s %s" % (lineno, line, e))
562        except EnvironmentError, e:
563            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
564                    "open %s: %s" % (file, e))
565        f.close()
566
567    def read_store(self):
568        try:
569            self.synch_store = synch_store()
570            self.synch_store.load(self.store_filename)
571            self.log.debug("[read_store]: Read store from %s" % \
572                    self.store_filename)
573        except EnvironmentError, e:
574            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
575                    % (self.state_filename, e))
576            self.synch_store = synch_store()
577
578        # Set the initial permissions on data in the store.  XXX: This ad hoc
579        # authorization attribute initialization is getting out of hand.
580        for k in self.synch_store.all_keys():
581            try:
582                if k.startswith('fedid:'):
583                    fid = fedid(hexstr=k[6:46])
584                    if self.state.has_key(fid):
585                        for a in self.get_alloc_ids(self.state[fid]):
586                            self.auth.set_attribute(a, k)
587            except ValueError, e:
588                self.log.warn("Cannot deduce permissions for %s" % k)
589
590
591    def write_store(self):
592        """
593        Write a new copy of synch_store after writing current state
594        to a backup.  We use the internal synch_store pickle method to avoid
595        incinsistent data.
596
597        State format is a simple pickling of the store.
598        """
599        if os.access(self.store_filename, os.W_OK):
600            copy_file(self.store_filename, \
601                    "%s.bak" % self.store_filename)
602        try:
603            self.synch_store.save(self.store_filename)
604        except EnvironmentError, e:
605            self.log.error("Can't write file %s: %s" % \
606                    (self.store_filename, e))
607        except TypeError, e:
608            self.log.error("Pickling problem (TypeError): %s" % e)
609
610       
611    def generate_ssh_keys(self, dest, type="rsa" ):
612        """
613        Generate a set of keys for the gateways to use to talk.
614
615        Keys are of type type and are stored in the required dest file.
616        """
617        valid_types = ("rsa", "dsa")
618        t = type.lower();
619        if t not in valid_types: raise ValueError
620        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
621
622        try:
623            trace = open("/dev/null", "w")
624        except EnvironmentError:
625            raise service_error(service_error.internal,
626                    "Cannot open /dev/null??");
627
628        # May raise CalledProcessError
629        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
630        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
631        if rv != 0:
632            raise service_error(service_error.internal, 
633                    "Cannot generate nonce ssh keys.  %s return code %d" \
634                            % (self.ssh_keygen, rv))
635
636    def gentopo(self, str):
637        """
638        Generate the topology dtat structure from the splitter's XML
639        representation of it.
640
641        The topology XML looks like:
642            <experiment>
643                <nodes>
644                    <node><vname></vname><ips>ip1:ip2</ips></node>
645                </nodes>
646                <lans>
647                    <lan>
648                        <vname></vname><vnode></vnode><ip></ip>
649                        <bandwidth></bandwidth><member>node:port</member>
650                    </lan>
651                </lans>
652        """
653        class topo_parse:
654            """
655            Parse the topology XML and create the dats structure.
656            """
657            def __init__(self):
658                # Typing of the subelements for data conversion
659                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
660                self.int_subelements = ( 'bandwidth',)
661                self.float_subelements = ( 'delay',)
662                # The final data structure
663                self.nodes = [ ]
664                self.lans =  [ ]
665                self.topo = { \
666                        'node': self.nodes,\
667                        'lan' : self.lans,\
668                    }
669                self.element = { }  # Current element being created
670                self.chars = ""     # Last text seen
671
672            def end_element(self, name):
673                # After each sub element the contents is added to the current
674                # element or to the appropriate list.
675                if name == 'node':
676                    self.nodes.append(self.element)
677                    self.element = { }
678                elif name == 'lan':
679                    self.lans.append(self.element)
680                    self.element = { }
681                elif name in self.str_subelements:
682                    self.element[name] = self.chars
683                    self.chars = ""
684                elif name in self.int_subelements:
685                    self.element[name] = int(self.chars)
686                    self.chars = ""
687                elif name in self.float_subelements:
688                    self.element[name] = float(self.chars)
689                    self.chars = ""
690
691            def found_chars(self, data):
692                self.chars += data.rstrip()
693
694
695        tp = topo_parse();
696        parser = xml.parsers.expat.ParserCreate()
697        parser.EndElementHandler = tp.end_element
698        parser.CharacterDataHandler = tp.found_chars
699
700        parser.Parse(str)
701
702        return tp.topo
703       
704
705    def genviz(self, topo):
706        """
707        Generate the visualization the virtual topology
708        """
709
710        neato = "/usr/local/bin/neato"
711        # These are used to parse neato output and to create the visualization
712        # file.
713        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
714        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
715                "%s</type></node>"
716
717        try:
718            # Node names
719            nodes = [ n['vname'] for n in topo['node'] ]
720            topo_lans = topo['lan']
721        except KeyError, e:
722            raise service_error(service_error.internal, "Bad topology: %s" %e)
723
724        lans = { }
725        links = { }
726
727        # Walk through the virtual topology, organizing the connections into
728        # 2-node connections (links) and more-than-2-node connections (lans).
729        # When a lan is created, it's added to the list of nodes (there's a
730        # node in the visualization for the lan).
731        for l in topo_lans:
732            if links.has_key(l['vname']):
733                if len(links[l['vname']]) < 2:
734                    links[l['vname']].append(l['vnode'])
735                else:
736                    nodes.append(l['vname'])
737                    lans[l['vname']] = links[l['vname']]
738                    del links[l['vname']]
739                    lans[l['vname']].append(l['vnode'])
740            elif lans.has_key(l['vname']):
741                lans[l['vname']].append(l['vnode'])
742            else:
743                links[l['vname']] = [ l['vnode'] ]
744
745
746        # Open up a temporary file for dot to turn into a visualization
747        try:
748            df, dotname = tempfile.mkstemp()
749            dotfile = os.fdopen(df, 'w')
750        except EnvironmentError:
751            raise service_error(service_error.internal,
752                    "Failed to open file in genviz")
753
754        try:
755            dnull = open('/dev/null', 'w')
756        except EnvironmentError:
757            service_error(service_error.internal,
758                    "Failed to open /dev/null in genviz")
759
760        # Generate a dot/neato input file from the links, nodes and lans
761        try:
762            print >>dotfile, "graph G {"
763            for n in nodes:
764                print >>dotfile, '\t"%s"' % n
765            for l in links.keys():
766                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
767            for l in lans.keys():
768                for n in lans[l]:
769                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
770            print >>dotfile, "}"
771            dotfile.close()
772        except TypeError:
773            raise service_error(service_error.internal,
774                    "Single endpoint link in vtopo")
775        except EnvironmentError:
776            raise service_error(service_error.internal, "Cannot write dot file")
777
778        # Use dot to create a visualization
779        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
780                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
781                close_fds=True)
782        dnull.close()
783
784        # Translate dot to vis format
785        vis_nodes = [ ]
786        vis = { 'node': vis_nodes }
787        for line in dot.stdout:
788            m = vis_re.match(line)
789            if m:
790                vn = m.group(1)
791                vis_node = {'name': vn, \
792                        'x': float(m.group(2)),\
793                        'y' : float(m.group(3)),\
794                    }
795                if vn in links.keys() or vn in lans.keys():
796                    vis_node['type'] = 'lan'
797                else:
798                    vis_node['type'] = 'node'
799                vis_nodes.append(vis_node)
800        rv = dot.wait()
801
802        os.remove(dotname)
803        if rv == 0 : return vis
804        else: return None
805
806    def get_access(self, tb, nodes, tbparam, access_user, masters):
807        """
808        Get access to testbed through fedd and set the parameters for that tb
809        """
810        def get_export_project(svcs):
811            """
812            Look through for the list of federated_service for this testbed
813            objects for a project_export service, and extract the project
814            parameter.
815            """
816
817            pe = [s for s in svcs if s.name=='project_export']
818            if len(pe) == 1:
819                return pe[0].params.get('project', None)
820            elif len(pe) == 0:
821                return None
822            else:
823                raise service_error(service_error.req,
824                        "More than one project export is not supported")
825
826        uri = self.tbmap.get(testbed_base(tb), None)
827        if not uri:
828            raise service_error(service_error.server_config, 
829                    "Unknown testbed: %s" % tb)
830
831        export_svcs = masters.get(tb,[])
832        import_svcs = [ s for m in masters.values() \
833                for s in m \
834                    if tb in s.importers ]
835
836        export_project = get_export_project(export_svcs)
837
838        # Tweak search order so that if there are entries in access_user that
839        # have a project matching the export project, we try them first
840        if export_project: 
841            access_sequence = [ (p, u) for p, u in access_user \
842                    if p == export_project] 
843            access_sequence.extend([(p, u) for p, u in access_user \
844                    if p != export_project]) 
845        else: 
846            access_sequence = access_user
847
848        for p, u in access_sequence: 
849            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
850                    "to %s") %  ((p or "None"), u, uri))
851
852            if p:
853                # Request with user and project specified
854                req = {\
855                        'destinationTestbed' : { 'uri' : uri },
856                        'credential': [ "project: %s" % p, "user: %s"  % u],
857                        'allocID' : { 'localname': 'test' },
858                    }
859            else:
860                # Request with only user specified
861                req = {\
862                        'destinationTestbed' : { 'uri' : uri },
863                        'credential': [ 'user: %s' % u ],
864                        'allocID' : { 'localname': 'test' },
865                    }
866
867            # Make the service request from the services we're importing and
868            # exporting.  Keep track of the export request ids so we can
869            # collect the resulting info from the access response.
870            e_keys = { }
871            if import_svcs or export_svcs:
872                req['service'] = [ ]
873
874                for i, s in enumerate(import_svcs):
875                    idx = 'import%d' % i
876                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
877                    if s.params:
878                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
879                                for k, v in s.params.items()]
880                    req['service'].append(sr)
881
882                for i, s in enumerate(export_svcs):
883                    idx = 'export%d' % i
884                    e_keys[idx] = s
885                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
886                    if s.params:
887                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
888                                for k, v in s.params.items()]
889                    req['service'].append(sr)
890
891            # node resources if any
892            if nodes != None and len(nodes) > 0:
893                rnodes = [ ]
894                for n in nodes:
895                    rn = { }
896                    image, hw, count = n.split(":")
897                    if image: rn['image'] = [ image ]
898                    if hw: rn['hardware'] = [ hw ]
899                    if count and int(count) >0 : rn['count'] = int(count)
900                    rnodes.append(rn)
901                req['resources']= { }
902                req['resources']['node'] = rnodes
903
904            try:
905                if self.local_access.has_key(uri):
906                    # Local access call
907                    req = { 'RequestAccessRequestBody' : req }
908                    r = self.local_access[uri].RequestAccess(req, 
909                            fedid(file=self.cert_file))
910                    r = { 'RequestAccessResponseBody' : r }
911                else:
912                    r = self.call_RequestAccess(uri, req, 
913                            self.cert_file, self.cert_pwd, self.trusted_certs)
914            except service_error, e:
915                if e.code == service_error.access:
916                    self.log.debug("[get_access] Access denied")
917                    r = None
918                    continue
919                else:
920                    raise e
921
922            if r.has_key('RequestAccessResponseBody'):
923                # Through to here we have a valid response, not a fault.
924                # Access denied is a fault, so something better or worse than
925                # access denied has happened.
926                r = r['RequestAccessResponseBody']
927                self.log.debug("[get_access] Access granted")
928                break
929            else:
930                raise service_error(service_error.protocol,
931                        "Bad proxy response")
932       
933        if not r:
934            raise service_error(service_error.access, 
935                    "Access denied by %s (%s)" % (tb, uri))
936
937        tbparam[tb] = { 
938                "allocID" : r['allocID'],
939                "uri": uri,
940                }
941
942        # Collect the responses corresponding to the services this testbed
943        # exports.  These will be the service requests that we will include in
944        # the start segment requests (with appropriate visibility values) to
945        # import and export the segments.
946        for s in r.get('service', []):
947            id = s.get('id', None)
948            if id and id in e_keys:
949                e_keys[id].reqs.append(s)
950
951        # Add attributes to parameter space.  We don't allow attributes to
952        # overlay any parameters already installed.
953        for a in r.get('fedAttr', []):
954            try:
955                if a['attribute'] and \
956                        isinstance(a['attribute'], basestring)\
957                        and not tbparam[tb].has_key(a['attribute'].lower()):
958                    tbparam[tb][a['attribute'].lower()] = a['value']
959            except KeyError:
960                self.log.error("Bad attribute in response: %s" % a)
961
962    def release_access(self, tb, aid, uri=None):
963        """
964        Release access to testbed through fedd
965        """
966
967        if not uri:
968            uri = self.tbmap.get(tb, None)
969        if not uri:
970            raise service_error(service_error.server_config, 
971                    "Unknown testbed: %s" % tb)
972
973        if self.local_access.has_key(uri):
974            resp = self.local_access[uri].ReleaseAccess(\
975                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
976                    fedid(file=self.cert_file))
977            resp = { 'ReleaseAccessResponseBody': resp } 
978        else:
979            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
980                    self.cert_file, self.cert_pwd, self.trusted_certs)
981
982        # better error coding
983
984    def remote_ns2topdl(self, uri, desc):
985
986        req = {
987                'description' : { 'ns2description': desc },
988            }
989
990        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
991                self.trusted_certs)
992
993        if r.has_key('Ns2TopdlResponseBody'):
994            r = r['Ns2TopdlResponseBody']
995            ed = r.get('experimentdescription', None)
996            if ed.has_key('topdldescription'):
997                return topdl.Topology(**ed['topdldescription'])
998            else:
999                raise service_error(service_error.protocol, 
1000                        "Bad splitter response (no output)")
1001        else:
1002            raise service_error(service_error.protocol, "Bad splitter response")
1003
1004    class start_segment:
1005        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1006                cert_pwd=None, trusted_certs=None, caller=None,
1007                log_collector=None):
1008            self.log = log
1009            self.debug = debug
1010            self.cert_file = cert_file
1011            self.cert_pwd = cert_pwd
1012            self.trusted_certs = None
1013            self.caller = caller
1014            self.testbed = testbed
1015            self.log_collector = log_collector
1016            self.response = None
1017            self.node = { }
1018
1019        #def make_map(self, resp):
1020            #if 'segmentdescription' in resp and \
1021                    #'topdldescription' in resp['segmentdescription']:
1022                #top = topdl.Topology(\
1023                        #**resp['segmentdescription']['topdldescription'])
1024                #for e in [e for e in top.elements \
1025                        #if isinstance(e, topdl.Computer)]:
1026                    #hn = e.get_attribute('hostname')
1027                    #if hn:
1028                        #for n in e.name:
1029                            #self.node[n] = hn
1030
1031        def make_map(self, resp):
1032            for e in resp.get('embedding', []):
1033                if 'toponame' in e and 'physname' in e:
1034                    self.node[e['toponame']] = e['physname'][0]
1035
1036        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1037            req = {
1038                    'allocID': { 'fedid' : aid }, 
1039                    'segmentdescription': { 
1040                        'topdldescription': topo.to_dict(),
1041                    },
1042                }
1043
1044            if connInfo:
1045                req['connection'] = connInfo
1046
1047            import_svcs = [ s for m in masters.values() \
1048                    for s in m if self.testbed in s.importers]
1049
1050            if import_svcs or self.testbed in masters:
1051                req['service'] = []
1052
1053            for s in import_svcs:
1054                for r in s.reqs:
1055                    sr = copy.deepcopy(r)
1056                    sr['visibility'] = 'import';
1057                    req['service'].append(sr)
1058
1059            for s in masters.get(self.testbed, []):
1060                for r in s.reqs:
1061                    sr = copy.deepcopy(r)
1062                    sr['visibility'] = 'export';
1063                    req['service'].append(sr)
1064
1065            if attrs:
1066                req['fedAttr'] = attrs
1067
1068            try:
1069                self.log.debug("Calling StartSegment at %s " % uri)
1070                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1071                        self.trusted_certs)
1072                if r.has_key('StartSegmentResponseBody'):
1073                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1074                            None)
1075                    if lval and self.log_collector:
1076                        for line in  lval.splitlines(True):
1077                            self.log_collector.write(line)
1078                    self.make_map(r['StartSegmentResponseBody'])
1079                    self.response = r
1080                else:
1081                    raise service_error(service_error.internal, 
1082                            "Bad response!?: %s" %r)
1083                return True
1084            except service_error, e:
1085                self.log.error("Start segment failed on %s: %s" % \
1086                        (self.testbed, e))
1087                return False
1088
1089
1090
1091    class terminate_segment:
1092        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1093                cert_pwd=None, trusted_certs=None, caller=None):
1094            self.log = log
1095            self.debug = debug
1096            self.cert_file = cert_file
1097            self.cert_pwd = cert_pwd
1098            self.trusted_certs = None
1099            self.caller = caller
1100            self.testbed = testbed
1101
1102        def __call__(self, uri, aid ):
1103            req = {
1104                    'allocID': aid , 
1105                }
1106            try:
1107                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1108                        self.trusted_certs)
1109                return True
1110            except service_error, e:
1111                self.log.error("Terminate segment failed on %s: %s" % \
1112                        (self.testbed, e))
1113                return False
1114   
1115
1116    def allocate_resources(self, allocated, masters, eid, expid, 
1117            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1118            attrs=None, connInfo={}):
1119
1120        started = { }           # Testbeds where a sub-experiment started
1121                                # successfully
1122
1123        # XXX
1124        fail_soft = False
1125
1126        log = alloc_log or self.log
1127
1128        thread_pool = self.thread_pool(self.nthreads)
1129        threads = [ ]
1130        starters = [ ]
1131
1132        for tb in allocated.keys():
1133            # Create and start a thread to start the segment, and save it
1134            # to get the return value later
1135            tb_attrs = copy.copy(attrs)
1136            thread_pool.wait_for_slot()
1137            uri = tbparams[tb].get('uri', \
1138                    self.tbmap.get(testbed_base(tb), None))
1139            base, suffix = split_testbed(tb)
1140            if suffix:
1141                tb_attrs.append({'attribute': 'experiment_name', 
1142                    'value': "%s-%s" % (eid, suffix)})
1143            else:
1144                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1145            if not uri:
1146                raise service_error(service_error.internal, 
1147                        "Unknown testbed %s !?" % tb)
1148
1149            if tbparams[tb].has_key('allocID') and \
1150                    tbparams[tb]['allocID'].has_key('fedid'):
1151                aid = tbparams[tb]['allocID']['fedid']
1152            else:
1153                raise service_error(service_error.internal, 
1154                        "No alloc id for testbed %s !?" % tb)
1155
1156            s = self.start_segment(log=log, debug=self.debug,
1157                    testbed=tb, cert_file=self.cert_file,
1158                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1159                    caller=self.call_StartSegment,
1160                    log_collector=log_collector)
1161            starters.append(s)
1162            t  = self.pooled_thread(\
1163                    target=s, name=tb,
1164                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1165                    pdata=thread_pool, trace_file=self.trace_file)
1166            threads.append(t)
1167            t.start()
1168
1169        # Wait until all finish (keep pinging the log, though)
1170        mins = 0
1171        revoked = False
1172        while not thread_pool.wait_for_all_done(60.0):
1173            mins += 1
1174            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1175                    % mins)
1176            if not revoked and \
1177                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1178                # a testbed has failed.  Revoke this experiment's
1179                # synchronizarion values so that sub experiments will not
1180                # deadlock waiting for synchronization that will never happen
1181                self.log.info("A subexperiment has failed to swap in, " + \
1182                        "revoking synch keys")
1183                var_key = "fedid:%s" % expid
1184                for k in self.synch_store.all_keys():
1185                    if len(k) > 45 and k[0:46] == var_key:
1186                        self.synch_store.revoke_key(k)
1187                revoked = True
1188
1189        failed = [ t.getName() for t in threads if not t.rv ]
1190        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1191
1192        # If one failed clean up, unless fail_soft is set
1193        if failed:
1194            if not fail_soft:
1195                thread_pool.clear()
1196                for tb in succeeded:
1197                    # Create and start a thread to stop the segment
1198                    thread_pool.wait_for_slot()
1199                    uri = tbparams[tb]['uri']
1200                    t  = self.pooled_thread(\
1201                            target=self.terminate_segment(log=log,
1202                                testbed=tb,
1203                                cert_file=self.cert_file, 
1204                                cert_pwd=self.cert_pwd,
1205                                trusted_certs=self.trusted_certs,
1206                                caller=self.call_TerminateSegment),
1207                            args=(uri, tbparams[tb]['federant']['allocID']),
1208                            name=tb,
1209                            pdata=thread_pool, trace_file=self.trace_file)
1210                    t.start()
1211                # Wait until all finish (if any are being stopped)
1212                if succeeded:
1213                    thread_pool.wait_for_all_done()
1214
1215                # release the allocations
1216                for tb in tbparams.keys():
1217                    self.release_access(tb, tbparams[tb]['allocID'],
1218                            tbparams[tb].get('uri', None))
1219                # Remove the placeholder
1220                self.state_lock.acquire()
1221                self.state[eid]['experimentStatus'] = 'failed'
1222                if self.state_filename: self.write_state()
1223                self.state_lock.release()
1224
1225                log.error("Swap in failed on %s" % ",".join(failed))
1226                return
1227        else:
1228            # Walk through the successes and gather the virtual to physical
1229            # mapping.
1230            node = { }
1231            for s in starters:
1232                node.update(s.node)
1233            # Assign the mapping as a hostname attribute
1234            for e in [ e for e in top.elements \
1235                    if isinstance(e, topdl.Computer)]:
1236                for n in e.name:
1237                    if n in node:
1238                        e.set_attribute('hostname', node[n])
1239            log.info("[start_segment]: Experiment %s active" % eid)
1240
1241
1242        # Walk up tmpdir, deleting as we go
1243        if self.cleanup:
1244            log.debug("[start_experiment]: removing %s" % tmpdir)
1245            for path, dirs, files in os.walk(tmpdir, topdown=False):
1246                for f in files:
1247                    os.remove(os.path.join(path, f))
1248                for d in dirs:
1249                    os.rmdir(os.path.join(path, d))
1250            os.rmdir(tmpdir)
1251        else:
1252            log.debug("[start_experiment]: not removing %s" % tmpdir)
1253
1254        # Insert the experiment into our state and update the disk copy.
1255        self.state_lock.acquire()
1256        self.state[expid]['experimentStatus'] = 'active'
1257        self.state[eid] = self.state[expid]
1258        self.state[eid]['experimentdescription']['topdldescription'] = \
1259                top.to_dict()
1260        if self.state_filename: self.write_state()
1261        self.state_lock.release()
1262        return
1263
1264
1265    def add_kit(self, e, kit):
1266        """
1267        Add a Software object created from the list of (install, location)
1268        tuples passed as kit  to the software attribute of an object e.  We
1269        do this enough to break out the code, but it's kind of a hack to
1270        avoid changing the old tuple rep.
1271        """
1272
1273        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1274
1275        if isinstance(e.software, list): e.software.extend(s)
1276        else: e.software = s
1277
1278
1279    def create_experiment_state(self, fid, req, expid, expcert,
1280            state='starting'):
1281        """
1282        Create the initial entry in the experiment's state.  The expid and
1283        expcert are the experiment's fedid and certifacte that represents that
1284        ID, which are installed in the experiment state.  If the request
1285        includes a suggested local name that is used if possible.  If the local
1286        name is already taken by an experiment owned by this user that has
1287        failed, it is overwritten.  Otherwise new letters are added until a
1288        valid localname is found.  The generated local name is returned.
1289        """
1290
1291        if req.has_key('experimentID') and \
1292                req['experimentID'].has_key('localname'):
1293            overwrite = False
1294            eid = req['experimentID']['localname']
1295            # If there's an old failed experiment here with the same local name
1296            # and accessible by this user, we'll overwrite it, otherwise we'll
1297            # fall through and do the collision avoidance.
1298            old_expid = self.get_experiment_fedid(eid)
1299            if old_expid and self.check_experiment_access(fid, old_expid):
1300                self.state_lock.acquire()
1301                status = self.state[eid].get('experimentStatus', None)
1302                if status and status == 'failed':
1303                    # remove the old access attribute
1304                    self.auth.unset_attribute(fid, old_expid)
1305                    overwrite = True
1306                    del self.state[eid]
1307                    del self.state[old_expid]
1308                self.state_lock.release()
1309            self.state_lock.acquire()
1310            while (self.state.has_key(eid) and not overwrite):
1311                eid += random.choice(string.ascii_letters)
1312            # Initial state
1313            self.state[eid] = {
1314                    'experimentID' : \
1315                            [ { 'localname' : eid }, {'fedid': expid } ],
1316                    'experimentStatus': state,
1317                    'experimentAccess': { 'X509' : expcert },
1318                    'owner': fid,
1319                    'log' : [],
1320                }
1321            self.state[expid] = self.state[eid]
1322            if self.state_filename: self.write_state()
1323            self.state_lock.release()
1324        else:
1325            eid = self.exp_stem
1326            for i in range(0,5):
1327                eid += random.choice(string.ascii_letters)
1328            self.state_lock.acquire()
1329            while (self.state.has_key(eid)):
1330                eid = self.exp_stem
1331                for i in range(0,5):
1332                    eid += random.choice(string.ascii_letters)
1333            # Initial state
1334            self.state[eid] = {
1335                    'experimentID' : \
1336                            [ { 'localname' : eid }, {'fedid': expid } ],
1337                    'experimentStatus': state,
1338                    'experimentAccess': { 'X509' : expcert },
1339                    'owner': fid,
1340                    'log' : [],
1341                }
1342            self.state[expid] = self.state[eid]
1343            if self.state_filename: self.write_state()
1344            self.state_lock.release()
1345
1346        return eid
1347
1348
1349    def allocate_ips_to_topo(self, top):
1350        """
1351        Add an ip4_address attribute to all the hosts in the topology, based on
1352        the shared substrates on which they sit.  An /etc/hosts file is also
1353        created and returned as a list of hostfiles entries.  We also return
1354        the allocator, because we may need to allocate IPs to portals
1355        (specifically DRAGON portals).
1356        """
1357        subs = sorted(top.substrates, 
1358                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1359                reverse=True)
1360        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1361        ifs = { }
1362        hosts = [ ]
1363
1364        for idx, s in enumerate(subs):
1365            net_size = len(s.interfaces)+2
1366
1367            a = ips.allocate(net_size)
1368            if a :
1369                base, num = a
1370                if num < net_size: 
1371                    raise service_error(service_error.internal,
1372                            "Allocator returned wrong number of IPs??")
1373            else:
1374                raise service_error(service_error.req, 
1375                        "Cannot allocate IP addresses")
1376            mask = ips.min_alloc
1377            while mask < net_size:
1378                mask *= 2
1379
1380            netmask = ((2**32-1) ^ (mask-1))
1381
1382            base += 1
1383            for i in s.interfaces:
1384                i.attribute.append(
1385                        topdl.Attribute('ip4_address', 
1386                            "%s" % ip_addr(base)))
1387                i.attribute.append(
1388                        topdl.Attribute('ip4_netmask', 
1389                            "%s" % ip_addr(int(netmask))))
1390
1391                hname = i.element.name
1392                if ifs.has_key(hname):
1393                    hosts.append("%s\t%s-%s %s-%d" % \
1394                            (ip_addr(base), hname, s.name, hname,
1395                                ifs[hname]))
1396                else:
1397                    ifs[hname] = 0
1398                    hosts.append("%s\t%s-%s %s-%d %s" % \
1399                            (ip_addr(base), hname, s.name, hname,
1400                                ifs[hname], hname))
1401
1402                ifs[hname] += 1
1403                base += 1
1404        return hosts, ips
1405
1406    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1407            tbparams, masters):
1408        """
1409        Request access to the various testbeds required for this instantiation
1410        (passed in as testbeds).  User, access_user, expoert_project and master
1411        are used to construct the correct requests.  Per-testbed parameters are
1412        returned in tbparams.
1413        """
1414        for tb in testbeds:
1415            self.get_access(tb, None, tbparams, access_user, masters)
1416            allocated[tb] = 1
1417
1418    def split_topology(self, top, topo, testbeds):
1419        """
1420        Create the sub-topologies that are needed for experiment instantiation.
1421        """
1422        for tb in testbeds:
1423            topo[tb] = top.clone()
1424            # copy in for loop allows deletions from the original
1425            for e in [ e for e in topo[tb].elements]:
1426                etb = e.get_attribute('testbed')
1427                # NB: elements without a testbed attribute won't appear in any
1428                # sub topologies. 
1429                if not etb or etb != tb:
1430                    for i in e.interface:
1431                        for s in i.subs:
1432                            try:
1433                                s.interfaces.remove(i)
1434                            except ValueError:
1435                                raise service_error(service_error.internal,
1436                                        "Can't remove interface??")
1437                    topo[tb].elements.remove(e)
1438            topo[tb].make_indices()
1439
1440    def wrangle_software(self, expid, top, topo, tbparams):
1441        """
1442        Copy software out to the repository directory, allocate permissions and
1443        rewrite the segment topologies to look for the software in local
1444        places.
1445        """
1446
1447        # Copy the rpms and tarfiles to a distribution directory from
1448        # which the federants can retrieve them
1449        linkpath = "%s/software" %  expid
1450        softdir ="%s/%s" % ( self.repodir, linkpath)
1451        softmap = { }
1452        # These are in a list of tuples format (each kit).  This comprehension
1453        # unwraps them into a single list of tuples that initilaizes the set of
1454        # tuples.
1455        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1456                for p, t in l ])
1457        pkgs.update([x.location for e in top.elements \
1458                for x in e.software])
1459        try:
1460            os.makedirs(softdir)
1461        except EnvironmentError, e:
1462            raise service_error(
1463                    "Cannot create software directory: %s" % e)
1464        # The actual copying.  Everything's converted into a url for copying.
1465        for pkg in pkgs:
1466            loc = pkg
1467
1468            scheme, host, path = urlparse(loc)[0:3]
1469            dest = os.path.basename(path)
1470            if not scheme:
1471                if not loc.startswith('/'):
1472                    loc = "/%s" % loc
1473                loc = "file://%s" %loc
1474            try:
1475                u = urlopen(loc)
1476            except Exception, e:
1477                raise service_error(service_error.req, 
1478                        "Cannot open %s: %s" % (loc, e))
1479            try:
1480                f = open("%s/%s" % (softdir, dest) , "w")
1481                self.log.debug("Writing %s/%s" % (softdir,dest) )
1482                data = u.read(4096)
1483                while data:
1484                    f.write(data)
1485                    data = u.read(4096)
1486                f.close()
1487                u.close()
1488            except Exception, e:
1489                raise service_error(service_error.internal,
1490                        "Could not copy %s: %s" % (loc, e))
1491            path = re.sub("/tmp", "", linkpath)
1492            # XXX
1493            softmap[pkg] = \
1494                    "%s/%s/%s" %\
1495                    ( self.repo_url, path, dest)
1496
1497            # Allow the individual segments to access the software.
1498            for tb in tbparams.keys():
1499                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1500                        "/%s/%s" % ( path, dest))
1501
1502        # Convert the software locations in the segments into the local
1503        # copies on this host
1504        for soft in [ s for tb in topo.values() \
1505                for e in tb.elements \
1506                    if getattr(e, 'software', False) \
1507                        for s in e.software ]:
1508            if softmap.has_key(soft.location):
1509                soft.location = softmap[soft.location]
1510
1511
1512    def new_experiment(self, req, fid):
1513        """
1514        The external interface to empty initial experiment creation called from
1515        the dispatcher.
1516
1517        Creates a working directory, splits the incoming description using the
1518        splitter script and parses out the avrious subsections using the
1519        lcasses above.  Once each sub-experiment is created, use pooled threads
1520        to instantiate them and start it all up.
1521        """
1522        if not self.auth.check_attribute(fid, 'new'):
1523            raise service_error(service_error.access, "New access denied")
1524
1525        try:
1526            tmpdir = tempfile.mkdtemp(prefix="split-")
1527        except EnvironmentError:
1528            raise service_error(service_error.internal, "Cannot create tmp dir")
1529
1530        try:
1531            access_user = self.accessdb[fid]
1532        except KeyError:
1533            raise service_error(service_error.internal,
1534                    "Access map and authorizer out of sync in " + \
1535                            "new_experiment for fedid %s"  % fid)
1536
1537        pid = "dummy"
1538        gid = "dummy"
1539
1540        req = req.get('NewRequestBody', None)
1541        if not req:
1542            raise service_error(service_error.req,
1543                    "Bad request format (no NewRequestBody)")
1544
1545        # Generate an ID for the experiment (slice) and a certificate that the
1546        # allocator can use to prove they own it.  We'll ship it back through
1547        # the encrypted connection.
1548        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1549
1550        #now we're done with the tmpdir, and it should be empty
1551        if self.cleanup:
1552            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1553            os.rmdir(tmpdir)
1554        else:
1555            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1556
1557        eid = self.create_experiment_state(fid, req, expid, expcert, 
1558                state='empty')
1559
1560        # Let users touch the state
1561        self.auth.set_attribute(fid, expid)
1562        self.auth.set_attribute(expid, expid)
1563        # Override fedids can manipulate state as well
1564        for o in self.overrides:
1565            self.auth.set_attribute(o, expid)
1566
1567        rv = {
1568                'experimentID': [
1569                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1570                ],
1571                'experimentStatus': 'empty',
1572                'experimentAccess': { 'X509' : expcert }
1573            }
1574
1575        return rv
1576
1577    def create_experiment(self, req, fid):
1578        """
1579        The external interface to experiment creation called from the
1580        dispatcher.
1581
1582        Creates a working directory, splits the incoming description using the
1583        splitter script and parses out the various subsections using the
1584        classes above.  Once each sub-experiment is created, use pooled threads
1585        to instantiate them and start it all up.
1586        """
1587
1588        req = req.get('CreateRequestBody', None)
1589        if not req:
1590            raise service_error(service_error.req,
1591                    "Bad request format (no CreateRequestBody)")
1592
1593        # Get the experiment access
1594        exp = req.get('experimentID', None)
1595        if exp:
1596            if exp.has_key('fedid'):
1597                key = exp['fedid']
1598                expid = key
1599                eid = None
1600            elif exp.has_key('localname'):
1601                key = exp['localname']
1602                eid = key
1603                expid = None
1604            else:
1605                raise service_error(service_error.req, "Unknown lookup type")
1606        else:
1607            raise service_error(service_error.req, "No request?")
1608
1609        self.check_experiment_access(fid, key)
1610
1611        try:
1612            tmpdir = tempfile.mkdtemp(prefix="split-")
1613            os.mkdir(tmpdir+"/keys")
1614        except EnvironmentError:
1615            raise service_error(service_error.internal, "Cannot create tmp dir")
1616
1617        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1618        gw_secretkey_base = "fed.%s" % self.ssh_type
1619        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1620        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1621        tclfile = tmpdir + "/experiment.tcl"
1622        tbparams = { }
1623        try:
1624            access_user = self.accessdb[fid]
1625        except KeyError:
1626            raise service_error(service_error.internal,
1627                    "Access map and authorizer out of sync in " + \
1628                            "create_experiment for fedid %s"  % fid)
1629
1630        pid = "dummy"
1631        gid = "dummy"
1632
1633        # The tcl parser needs to read a file so put the content into that file
1634        descr=req.get('experimentdescription', None)
1635        if descr:
1636            file_content=descr.get('ns2description', None)
1637            if file_content:
1638                try:
1639                    f = open(tclfile, 'w')
1640                    f.write(file_content)
1641                    f.close()
1642                except EnvironmentError:
1643                    raise service_error(service_error.internal,
1644                            "Cannot write temp experiment description")
1645            else:
1646                raise service_error(service_error.req, 
1647                        "Only ns2descriptions supported")
1648        else:
1649            raise service_error(service_error.req, "No experiment description")
1650
1651        self.state_lock.acquire()
1652        if self.state.has_key(key):
1653            self.state[key]['experimentStatus'] = "starting"
1654            for e in self.state[key].get('experimentID',[]):
1655                if not expid and e.has_key('fedid'):
1656                    expid = e['fedid']
1657                elif not eid and e.has_key('localname'):
1658                    eid = e['localname']
1659        self.state_lock.release()
1660
1661        if not (eid and expid):
1662            raise service_error(service_error.internal, 
1663                    "Cannot find local experiment info!?")
1664
1665        try: 
1666            # This catches exceptions to clear the placeholder if necessary
1667            try:
1668                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1669            except ValueError:
1670                raise service_error(service_error.server_config, 
1671                        "Bad key type (%s)" % self.ssh_type)
1672
1673            # Copy the service request
1674            tb_services = [ s for s in req.get('service',[]) ]
1675            # Translate to topdl
1676            if self.splitter_url:
1677                self.log.debug("Calling remote topdl translator at %s" % \
1678                        self.splitter_url)
1679                top = self.remote_ns2topdl(self.splitter_url, file_content)
1680            else:
1681                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1682                    str(self.muxmax), '-m', 'dummy']
1683
1684                tclcmd.extend([pid, gid, eid, tclfile])
1685
1686                self.log.debug("running local splitter %s", " ".join(tclcmd))
1687                # This is just fantastic.  As a side effect the parser copies
1688                # tb_compat.tcl into the current directory, so that directory
1689                # must be writable by the fedd user.  Doing this in the
1690                # temporary subdir ensures this is the case.
1691                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1692                        cwd=tmpdir)
1693                split_data = tclparser.stdout
1694
1695                top = topdl.topology_from_xml(file=split_data, top="experiment")
1696
1697            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1698            # Find the testbeds to look up
1699            testbeds = set([ a.value for e in top.elements \
1700                    for a in e.attribute \
1701                        if a.attribute == 'testbed'])
1702
1703            masters = { }           # testbeds exporting services
1704            for s in tb_services:
1705                # If this is a project_export request with the importall field
1706                # set, fill it out.
1707
1708                if s.get('importall', False):
1709                    s['import'] = [ tb for tb in testbeds \
1710                            if tb not in s.get('export',[])]
1711                    del s['importall']
1712
1713                # Add the service to masters
1714                for tb in s.get('export', []):
1715                    if s.get('name', None):
1716                        if tb not in masters:
1717                            masters[tb] = [ ]
1718
1719                        params = { }
1720                        if 'fedAttr' in s:
1721                            for a in s['fedAttr']:
1722                                params[a.get('attribute', '')] = \
1723                                        a.get('value','')
1724
1725                        masters[tb].append(federated_service(name=s['name'],
1726                                exporter=tb, importers=s.get('import',[]),
1727                                params=params, reqs=[]))
1728                    else:
1729                        self.log.error('Testbed service does not have name " + \
1730                                "and importers')
1731
1732
1733            allocated = { }         # Testbeds we can access
1734            topo ={ }               # Sub topologies
1735            connInfo = { }          # Connection information
1736            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1737                    tbparams, masters)
1738
1739            self.split_topology(top, topo, testbeds)
1740
1741            # Copy configuration files into the remote file store
1742            # The config urlpath
1743            configpath = "/%s/config" % expid
1744            # The config file system location
1745            configdir ="%s%s" % ( self.repodir, configpath)
1746            try:
1747                os.makedirs(configdir)
1748            except EnvironmentError, e:
1749                raise service_error(service_error.internal,
1750                        "Cannot create config directory: %s" % e)
1751            try:
1752                f = open("%s/hosts" % configdir, "w")
1753                f.write('\n'.join(hosts))
1754                f.close()
1755            except EnvironmentError, e:
1756                raise service_error(service_error.internal, 
1757                        "Cannot write hosts file: %s" % e)
1758            try:
1759                copy_file("%s" % gw_pubkey, "%s/%s" % \
1760                        (configdir, gw_pubkey_base))
1761                copy_file("%s" % gw_secretkey, "%s/%s" % \
1762                        (configdir, gw_secretkey_base))
1763            except EnvironmentError, e:
1764                raise service_error(service_error.internal, 
1765                        "Cannot copy keyfiles: %s" % e)
1766
1767            # Allow the individual testbeds to access the configuration files.
1768            for tb in tbparams.keys():
1769                asignee = tbparams[tb]['allocID']['fedid']
1770                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1771                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1772
1773            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1774                    self.muxmax, self.direct_transit)
1775            part.add_portals(top, topo, eid, masters, tbparams, ip_allocator,
1776                    connInfo, expid)
1777            # Now get access to the dynamic testbeds (those added above)
1778            for tb in [ t for t in topo if t not in allocated]:
1779                self.get_access(tb, None, tbparams, access_user, masters)
1780                allocated[tb] = 1
1781                store_keys = topo[tb].get_attribute('store_keys')
1782                # Give the testbed access to keys it exports or imports
1783                if store_keys:
1784                    for sk in store_keys.split(" "):
1785                        self.auth.set_attribute(\
1786                                tbparams[tb]['allocID']['fedid'], sk)
1787
1788            self.wrangle_software(expid, top, topo, tbparams)
1789
1790            vtopo = topdl.topology_to_vtopo(top)
1791            vis = self.genviz(vtopo)
1792
1793            # save federant information
1794            for k in allocated.keys():
1795                tbparams[k]['federant'] = {
1796                        'name': [ { 'localname' : eid} ],
1797                        'allocID' : tbparams[k]['allocID'],
1798                        'uri': tbparams[k]['uri'],
1799                    }
1800
1801            self.state_lock.acquire()
1802            self.state[eid]['vtopo'] = vtopo
1803            self.state[eid]['vis'] = vis
1804            self.state[eid]['experimentdescription'] = \
1805                    { 'topdldescription': top.to_dict() }
1806            self.state[eid]['federant'] = \
1807                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1808                        if tbparams[tb].has_key('federant') ]
1809            if self.state_filename: 
1810                self.write_state()
1811            self.state_lock.release()
1812        except service_error, e:
1813            # If something goes wrong in the parse (usually an access error)
1814            # clear the placeholder state.  From here on out the code delays
1815            # exceptions.  Failing at this point returns a fault to the remote
1816            # caller.
1817
1818            self.state_lock.acquire()
1819            del self.state[eid]
1820            del self.state[expid]
1821            if self.state_filename: self.write_state()
1822            self.state_lock.release()
1823            raise e
1824
1825
1826        # Start the background swapper and return the starting state.  From
1827        # here on out, the state will stick around a while.
1828
1829        # Let users touch the state
1830        self.auth.set_attribute(fid, expid)
1831        self.auth.set_attribute(expid, expid)
1832        # Override fedids can manipulate state as well
1833        for o in self.overrides:
1834            self.auth.set_attribute(o, expid)
1835
1836        # Create a logger that logs to the experiment's state object as well as
1837        # to the main log file.
1838        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1839        alloc_collector = self.list_log(self.state[eid]['log'])
1840        h = logging.StreamHandler(alloc_collector)
1841        # XXX: there should be a global one of these rather than repeating the
1842        # code.
1843        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1844                    '%d %b %y %H:%M:%S'))
1845        alloc_log.addHandler(h)
1846       
1847        attrs = [ 
1848                {
1849                    'attribute': 'ssh_pubkey', 
1850                    'value': '%s/%s/config/%s' % \
1851                            (self.repo_url, expid, gw_pubkey_base)
1852                },
1853                {
1854                    'attribute': 'ssh_secretkey', 
1855                    'value': '%s/%s/config/%s' % \
1856                            (self.repo_url, expid, gw_secretkey_base)
1857                },
1858                {
1859                    'attribute': 'hosts', 
1860                    'value': '%s/%s/config/hosts' % \
1861                            (self.repo_url, expid)
1862                },
1863            ]
1864
1865        # transit and disconnected testbeds may not have a connInfo entry.
1866        # Fill in the blanks.
1867        for t in allocated.keys():
1868            if not connInfo.has_key(t):
1869                connInfo[t] = { }
1870
1871        # Start a thread to do the resource allocation
1872        t  = Thread(target=self.allocate_resources,
1873                args=(allocated, masters, eid, expid, tbparams, 
1874                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1875                    connInfo),
1876                name=eid)
1877        t.start()
1878
1879        rv = {
1880                'experimentID': [
1881                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1882                ],
1883                'experimentStatus': 'starting',
1884            }
1885
1886        return rv
1887   
1888    def get_experiment_fedid(self, key):
1889        """
1890        find the fedid associated with the localname key in the state database.
1891        """
1892
1893        rv = None
1894        self.state_lock.acquire()
1895        if self.state.has_key(key):
1896            if isinstance(self.state[key], dict):
1897                try:
1898                    kl = [ f['fedid'] for f in \
1899                            self.state[key]['experimentID']\
1900                                if f.has_key('fedid') ]
1901                except KeyError:
1902                    self.state_lock.release()
1903                    raise service_error(service_error.internal, 
1904                            "No fedid for experiment %s when getting "+\
1905                                    "fedid(!?)" % key)
1906                if len(kl) == 1:
1907                    rv = kl[0]
1908                else:
1909                    self.state_lock.release()
1910                    raise service_error(service_error.internal, 
1911                            "multiple fedids for experiment %s when " +\
1912                                    "getting fedid(!?)" % key)
1913            else:
1914                self.state_lock.release()
1915                raise service_error(service_error.internal, 
1916                        "Unexpected state for %s" % key)
1917        self.state_lock.release()
1918        return rv
1919
1920    def check_experiment_access(self, fid, key):
1921        """
1922        Confirm that the fid has access to the experiment.  Though a request
1923        may be made in terms of a local name, the access attribute is always
1924        the experiment's fedid.
1925        """
1926        if not isinstance(key, fedid):
1927            key = self.get_experiment_fedid(key)
1928
1929        if self.auth.check_attribute(fid, key):
1930            return True
1931        else:
1932            raise service_error(service_error.access, "Access Denied")
1933
1934
1935    def get_handler(self, path, fid):
1936        self.log.info("Get handler %s %s" % (path, fid))
1937        if self.auth.check_attribute(fid, path):
1938            return ("%s/%s" % (self.repodir, path), "application/binary")
1939        else:
1940            return (None, None)
1941
1942    def get_vtopo(self, req, fid):
1943        """
1944        Return the stored virtual topology for this experiment
1945        """
1946        rv = None
1947        state = None
1948
1949        req = req.get('VtopoRequestBody', None)
1950        if not req:
1951            raise service_error(service_error.req,
1952                    "Bad request format (no VtopoRequestBody)")
1953        exp = req.get('experiment', None)
1954        if exp:
1955            if exp.has_key('fedid'):
1956                key = exp['fedid']
1957                keytype = "fedid"
1958            elif exp.has_key('localname'):
1959                key = exp['localname']
1960                keytype = "localname"
1961            else:
1962                raise service_error(service_error.req, "Unknown lookup type")
1963        else:
1964            raise service_error(service_error.req, "No request?")
1965
1966        self.check_experiment_access(fid, key)
1967
1968        self.state_lock.acquire()
1969        if self.state.has_key(key):
1970            if self.state[key].has_key('vtopo'):
1971                rv = { 'experiment' : {keytype: key },\
1972                        'vtopo': self.state[key]['vtopo'],\
1973                    }
1974            else:
1975                state = self.state[key]['experimentStatus']
1976        self.state_lock.release()
1977
1978        if rv: return rv
1979        else: 
1980            if state:
1981                raise service_error(service_error.partial, 
1982                        "Not ready: %s" % state)
1983            else:
1984                raise service_error(service_error.req, "No such experiment")
1985
1986    def get_vis(self, req, fid):
1987        """
1988        Return the stored visualization for this experiment
1989        """
1990        rv = None
1991        state = None
1992
1993        req = req.get('VisRequestBody', None)
1994        if not req:
1995            raise service_error(service_error.req,
1996                    "Bad request format (no VisRequestBody)")
1997        exp = req.get('experiment', None)
1998        if exp:
1999            if exp.has_key('fedid'):
2000                key = exp['fedid']
2001                keytype = "fedid"
2002            elif exp.has_key('localname'):
2003                key = exp['localname']
2004                keytype = "localname"
2005            else:
2006                raise service_error(service_error.req, "Unknown lookup type")
2007        else:
2008            raise service_error(service_error.req, "No request?")
2009
2010        self.check_experiment_access(fid, key)
2011
2012        self.state_lock.acquire()
2013        if self.state.has_key(key):
2014            if self.state[key].has_key('vis'):
2015                rv =  { 'experiment' : {keytype: key },\
2016                        'vis': self.state[key]['vis'],\
2017                        }
2018            else:
2019                state = self.state[key]['experimentStatus']
2020        self.state_lock.release()
2021
2022        if rv: return rv
2023        else:
2024            if state:
2025                raise service_error(service_error.partial, 
2026                        "Not ready: %s" % state)
2027            else:
2028                raise service_error(service_error.req, "No such experiment")
2029
2030    def clean_info_response(self, rv):
2031        """
2032        Remove the information in the experiment's state object that is not in
2033        the info response.
2034        """
2035        # Remove the owner info (should always be there, but...)
2036        if rv.has_key('owner'): del rv['owner']
2037
2038        # Convert the log into the allocationLog parameter and remove the
2039        # log entry (with defensive programming)
2040        if rv.has_key('log'):
2041            rv['allocationLog'] = "".join(rv['log'])
2042            del rv['log']
2043        else:
2044            rv['allocationLog'] = ""
2045
2046        if rv['experimentStatus'] != 'active':
2047            if rv.has_key('federant'): del rv['federant']
2048        else:
2049            # remove the allocationID and uri info from each federant
2050            for f in rv.get('federant', []):
2051                if f.has_key('allocID'): del f['allocID']
2052                if f.has_key('uri'): del f['uri']
2053
2054        return rv
2055
2056    def get_info(self, req, fid):
2057        """
2058        Return all the stored info about this experiment
2059        """
2060        rv = None
2061
2062        req = req.get('InfoRequestBody', None)
2063        if not req:
2064            raise service_error(service_error.req,
2065                    "Bad request format (no InfoRequestBody)")
2066        exp = req.get('experiment', None)
2067        if exp:
2068            if exp.has_key('fedid'):
2069                key = exp['fedid']
2070                keytype = "fedid"
2071            elif exp.has_key('localname'):
2072                key = exp['localname']
2073                keytype = "localname"
2074            else:
2075                raise service_error(service_error.req, "Unknown lookup type")
2076        else:
2077            raise service_error(service_error.req, "No request?")
2078
2079        self.check_experiment_access(fid, key)
2080
2081        # The state may be massaged by the service function that called
2082        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2083        # state.
2084        self.state_lock.acquire()
2085        if self.state.has_key(key):
2086            rv = copy.deepcopy(self.state[key])
2087        self.state_lock.release()
2088
2089        if rv:
2090            return self.clean_info_response(rv)
2091        else:
2092            raise service_error(service_error.req, "No such experiment")
2093
2094    def get_multi_info(self, req, fid):
2095        """
2096        Return all the stored info that this fedid can access
2097        """
2098        rv = { 'info': [ ] }
2099
2100        self.state_lock.acquire()
2101        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2102            try:
2103                self.check_experiment_access(fid, key)
2104            except service_error, e:
2105                if e.code == service_error.access:
2106                    continue
2107                else:
2108                    self.state_lock.release()
2109                    raise e
2110
2111            if self.state.has_key(key):
2112                e = copy.deepcopy(self.state[key])
2113                e = self.clean_info_response(e)
2114                rv['info'].append(e)
2115        self.state_lock.release()
2116        return rv
2117
2118    def terminate_experiment(self, req, fid):
2119        """
2120        Swap this experiment out on the federants and delete the shared
2121        information
2122        """
2123        tbparams = { }
2124        req = req.get('TerminateRequestBody', None)
2125        if not req:
2126            raise service_error(service_error.req,
2127                    "Bad request format (no TerminateRequestBody)")
2128        force = req.get('force', False)
2129        exp = req.get('experiment', None)
2130        if exp:
2131            if exp.has_key('fedid'):
2132                key = exp['fedid']
2133                keytype = "fedid"
2134            elif exp.has_key('localname'):
2135                key = exp['localname']
2136                keytype = "localname"
2137            else:
2138                raise service_error(service_error.req, "Unknown lookup type")
2139        else:
2140            raise service_error(service_error.req, "No request?")
2141
2142        self.check_experiment_access(fid, key)
2143
2144        dealloc_list = [ ]
2145
2146
2147        # Create a logger that logs to the dealloc_list as well as to the main
2148        # log file.
2149        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2150        h = logging.StreamHandler(self.list_log(dealloc_list))
2151        # XXX: there should be a global one of these rather than repeating the
2152        # code.
2153        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2154                    '%d %b %y %H:%M:%S'))
2155        dealloc_log.addHandler(h)
2156
2157        self.state_lock.acquire()
2158        fed_exp = self.state.get(key, None)
2159
2160        if fed_exp:
2161            # This branch of the conditional holds the lock to generate a
2162            # consistent temporary tbparams variable to deallocate experiments.
2163            # It releases the lock to do the deallocations and reacquires it to
2164            # remove the experiment state when the termination is complete.
2165
2166            # First make sure that the experiment creation is complete.
2167            status = fed_exp.get('experimentStatus', None)
2168
2169            if status:
2170                if status in ('starting', 'terminating'):
2171                    if not force:
2172                        self.state_lock.release()
2173                        raise service_error(service_error.partial, 
2174                                'Experiment still being created or destroyed')
2175                    else:
2176                        self.log.warning('Experiment in %s state ' % status + \
2177                                'being terminated by force.')
2178            else:
2179                # No status??? trouble
2180                self.state_lock.release()
2181                raise service_error(service_error.internal,
2182                        "Experiment has no status!?")
2183
2184            ids = []
2185            #  experimentID is a list of dicts that are self-describing
2186            #  identifiers.  This finds all the fedids and localnames - the
2187            #  keys of self.state - and puts them into ids.
2188            for id in fed_exp.get('experimentID', []):
2189                if id.has_key('fedid'): ids.append(id['fedid'])
2190                if id.has_key('localname'): ids.append(id['localname'])
2191
2192            # Collect the allocation/segment ids into a dict keyed by the fedid
2193            # of the allocation (or a monotonically increasing integer) that
2194            # contains a tuple of uri, aid (which is a dict...)
2195            for i, fed in enumerate(fed_exp.get('federant', [])):
2196                try:
2197                    uri = fed['uri']
2198                    aid = fed['allocID']
2199                    k = fed['allocID'].get('fedid', i)
2200                except KeyError, e:
2201                    continue
2202                tbparams[k] = (uri, aid)
2203            fed_exp['experimentStatus'] = 'terminating'
2204            if self.state_filename: self.write_state()
2205            self.state_lock.release()
2206
2207            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2208            # then completes, so we can't wait if nothing starts.  So, no
2209            # tbparams, no start.
2210            if len(tbparams) > 0:
2211                thread_pool = self.thread_pool(self.nthreads)
2212                for k in tbparams.keys():
2213                    # Create and start a thread to stop the segment
2214                    thread_pool.wait_for_slot()
2215                    uri, aid = tbparams[k]
2216                    t  = self.pooled_thread(\
2217                            target=self.terminate_segment(log=dealloc_log,
2218                                testbed=uri,
2219                                cert_file=self.cert_file, 
2220                                cert_pwd=self.cert_pwd,
2221                                trusted_certs=self.trusted_certs,
2222                                caller=self.call_TerminateSegment),
2223                            args=(uri, aid), name=k,
2224                            pdata=thread_pool, trace_file=self.trace_file)
2225                    t.start()
2226                # Wait for completions
2227                thread_pool.wait_for_all_done()
2228
2229            # release the allocations (failed experiments have done this
2230            # already, and starting experiments may be in odd states, so we
2231            # ignore errors releasing those allocations
2232            try: 
2233                for k in tbparams.keys():
2234                    # This releases access by uri
2235                    uri, aid = tbparams[k]
2236                    self.release_access(None, aid, uri=uri)
2237            except service_error, e:
2238                if status != 'failed' and not force:
2239                    raise e
2240
2241            # Remove the terminated experiment
2242            self.state_lock.acquire()
2243            for id in ids:
2244                if self.state.has_key(id): del self.state[id]
2245
2246            if self.state_filename: self.write_state()
2247            self.state_lock.release()
2248
2249            # Delete any synch points associated with this experiment.  All
2250            # synch points begin with the fedid of the experiment.
2251            fedid_keys = set(["fedid:%s" % f for f in ids \
2252                    if isinstance(f, fedid)])
2253            for k in self.synch_store.all_keys():
2254                try:
2255                    if len(k) > 45 and k[0:46] in fedid_keys:
2256                        self.synch_store.del_value(k)
2257                except synch_store.BadDeletionError:
2258                    pass
2259            self.write_store()
2260               
2261            return { 
2262                    'experiment': exp , 
2263                    'deallocationLog': "".join(dealloc_list),
2264                    }
2265        else:
2266            # Don't forget to release the lock
2267            self.state_lock.release()
2268            raise service_error(service_error.req, "No saved state")
2269
2270
2271    def GetValue(self, req, fid):
2272        """
2273        Get a value from the synchronized store
2274        """
2275        req = req.get('GetValueRequestBody', None)
2276        if not req:
2277            raise service_error(service_error.req,
2278                    "Bad request format (no GetValueRequestBody)")
2279       
2280        name = req['name']
2281        wait = req['wait']
2282        rv = { 'name': name }
2283
2284        if self.auth.check_attribute(fid, name):
2285            self.log.debug("[GetValue] asking for %s " % name)
2286            try:
2287                v = self.synch_store.get_value(name, wait)
2288            except synch_store.RevokedKeyError:
2289                # No more synch on this key
2290                raise service_error(service_error.federant, 
2291                        "Synch key %s revoked" % name)
2292            if v is not None:
2293                rv['value'] = v
2294            self.log.debug("[GetValue] got %s from %s" % (v, name))
2295            return rv
2296        else:
2297            raise service_error(service_error.access, "Access Denied")
2298       
2299
2300    def SetValue(self, req, fid):
2301        """
2302        Set a value in the synchronized store
2303        """
2304        req = req.get('SetValueRequestBody', None)
2305        if not req:
2306            raise service_error(service_error.req,
2307                    "Bad request format (no SetValueRequestBody)")
2308       
2309        name = req['name']
2310        v = req['value']
2311
2312        if self.auth.check_attribute(fid, name):
2313            try:
2314                self.synch_store.set_value(name, v)
2315                self.write_store()
2316                self.log.debug("[SetValue] set %s to %s" % (name, v))
2317            except synch_store.CollisionError:
2318                # Translate into a service_error
2319                raise service_error(service_error.req,
2320                        "Value already set: %s" %name)
2321            except synch_store.RevokedKeyError:
2322                # No more synch on this key
2323                raise service_error(service_error.federant, 
2324                        "Synch key %s revoked" % name)
2325            return { 'name': name, 'value': v }
2326        else:
2327            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.