source: fedd/federation/experiment_control.py @ a0119a1

compt_changes
Last change on this file since a0119a1 was a0119a1, checked in by Ted Faber <faber@…>, 13 years ago

Have experiment controllers share information about themselves

  • Property mode set to 100644
File size: 92.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from deter import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39from operation_status import operation_status
40
41from deter import topdl
42from deter import ip_allocator
43from deter import ip_addr
44import list_log
45
46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
53class experiment_control_local(experiment_control_legacy):
54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
60
61    class ssh_cmd_timeout(RuntimeError): pass
62   
63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
65    call_StartSegment = service_caller('StartSegment')
66    call_TerminateSegment = service_caller('TerminateSegment')
67    call_InfoSegment = service_caller('InfoSegment')
68    call_OperationSegment = service_caller('OperationSegment')
69    call_Ns2Topdl = service_caller('Ns2Topdl')
70
71    def __init__(self, config=None, auth=None):
72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
90        self.list_log = list_log.list_log
91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
102        self.repodir = config.get("experiment_control", "repodir")
103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
105
106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
110        self.nthreads = 10
111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131
132        dt = config.get("experiment_control", "direct_transit")
133        self.auth_type = config.get('experiment_control', 'auth_type') \
134                or 'legacy'
135        self.auth_dir = config.get('experiment_control', 'auth_dir')
136        # XXX: document this!
137        self.info_cache_limit = \
138                config.getint('experiment_control', 'info_cache', 600)
139        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
140        else: self.direct_transit = [ ]
141        # NB for internal master/slave ops, not experiment setup
142        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
143
144        self.overrides = set([])
145        ovr = config.get('experiment_control', 'overrides')
146        if ovr:
147            for o in ovr.split(","):
148                o = o.strip()
149                if o.startswith('fedid:'): o = o[len('fedid:'):]
150                self.overrides.add(fedid(hexstr=o))
151
152        self.state = { }
153        self.state_lock = Lock()
154        self.tclsh = "/usr/local/bin/otclsh"
155        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
156                config.get("experiment_control", "tcl_splitter",
157                        "/usr/testbed/lib/ns2ir/parse.tcl")
158        mapdb_file = config.get("experiment_control", "mapdb")
159        self.trace_file = sys.stderr
160
161        self.def_expstart = \
162                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
163                "/tmp/federate";
164        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
165                "FEDDIR/hosts";
166        self.def_gwstart = \
167                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
168                "/tmp/bridge.log";
169        self.def_mgwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
171                "/tmp/bridge.log";
172        self.def_gwimage = "FBSD61-TUNNEL2";
173        self.def_gwtype = "pc";
174        self.local_access = { }
175
176        if self.auth_type == 'legacy':
177            if auth:
178                self.auth = auth
179            else:
180                self.log.error( "[access]: No authorizer initialized, " +\
181                        "creating local one.")
182                auth = authorizer()
183            self.get_access = self.legacy_get_access
184        elif self.auth_type == 'abac':
185            self.auth = abac_authorizer(load=self.auth_dir)
186        else:
187            raise service_error(service_error.internal, 
188                    "Unknown auth_type: %s" % self.auth_type)
189
190        if mapdb_file:
191            self.read_mapdb(mapdb_file)
192        else:
193            self.log.warn("[experiment_control] No testbed map, using defaults")
194            self.tbmap = { 
195                    'deter':'https://users.isi.deterlab.net:23235',
196                    'emulab':'https://users.isi.deterlab.net:23236',
197                    'ucb':'https://users.isi.deterlab.net:23237',
198                    }
199
200        # Grab saved state.  OK to do this w/o locking because it's read only
201        # and only one thread should be in existence that can see self.state at
202        # this point.
203        if self.state_filename:
204            self.read_state()
205
206        if self.store_filename:
207            self.read_store()
208        else:
209            self.log.warning("No saved synch store")
210            self.synch_store = synch_store
211
212        # Dispatch tables
213        self.soap_services = {\
214                'New': soap_handler('New', self.new_experiment),
215                'Create': soap_handler('Create', self.create_experiment),
216                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
217                'Vis': soap_handler('Vis', self.get_vis),
218                'Info': soap_handler('Info', self.get_info),
219                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
220                'Operation': soap_handler('Operation', self.do_operation),
221                'Terminate': soap_handler('Terminate', 
222                    self.terminate_experiment),
223                'GetValue': soap_handler('GetValue', self.GetValue),
224                'SetValue': soap_handler('SetValue', self.SetValue),
225        }
226
227        self.xmlrpc_services = {\
228                'New': xmlrpc_handler('New', self.new_experiment),
229                'Create': xmlrpc_handler('Create', self.create_experiment),
230                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
231                'Vis': xmlrpc_handler('Vis', self.get_vis),
232                'Info': xmlrpc_handler('Info', self.get_info),
233                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
234                'Terminate': xmlrpc_handler('Terminate',
235                    self.terminate_experiment),
236                'Operation': xmlrpc_handler('Operation', self.do_operation),
237                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
238                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
239        }
240
241    # Call while holding self.state_lock
242    def write_state(self):
243        """
244        Write a new copy of experiment state after copying the existing state
245        to a backup.
246
247        State format is a simple pickling of the state dictionary.
248        """
249        if os.access(self.state_filename, os.W_OK):
250            copy_file(self.state_filename, \
251                    "%s.bak" % self.state_filename)
252        try:
253            f = open(self.state_filename, 'w')
254            pickle.dump(self.state, f)
255        except EnvironmentError, e:
256            self.log.error("Can't write file %s: %s" % \
257                    (self.state_filename, e))
258        except pickle.PicklingError, e:
259            self.log.error("Pickling problem: %s" % e)
260        except TypeError, e:
261            self.log.error("Pickling problem (TypeError): %s" % e)
262
263    @staticmethod
264    def get_alloc_ids(exp):
265        """
266        Used by read_store and read state.  This used to be worse.
267        """
268
269        return [ a.allocID for a in exp.get_all_allocations() ]
270
271
272    # Call while holding self.state_lock
273    def read_state(self):
274        """
275        Read a new copy of experiment state.  Old state is overwritten.
276
277        State format is a simple pickling of the state dictionary.
278        """
279       
280        try:
281            f = open(self.state_filename, "r")
282            self.state = pickle.load(f)
283            self.log.debug("[read_state]: Read state from %s" % \
284                    self.state_filename)
285        except EnvironmentError, e:
286            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
287                    % (self.state_filename, e))
288        except pickle.UnpicklingError, e:
289            self.log.warning(("[read_state]: No saved state: " + \
290                    "Unpickling failed: %s") % e)
291       
292        for s in self.state.values():
293            try:
294
295                eid = s.fedid
296                if eid : 
297                    if self.auth_type == 'legacy':
298                        # XXX: legacy
299                        # Give the owner rights to the experiment
300                        #self.auth.set_attribute(s['owner'], eid)
301                        # And holders of the eid as well
302                        self.auth.set_attribute(eid, eid)
303                        # allow overrides to control experiments as well
304                        for o in self.overrides:
305                            self.auth.set_attribute(o, eid)
306                        # Set permissions to allow reading of the software
307                        # repo, if any, as well.
308                        for a in self.get_alloc_ids(s):
309                            self.auth.set_attribute(a, 'repo/%s' % eid)
310                else:
311                    raise KeyError("No experiment id")
312            except KeyError, e:
313                self.log.warning("[read_state]: State ownership or identity " +\
314                        "misformatted in %s: %s" % (self.state_filename, e))
315
316    def read_mapdb(self, file):
317        """
318        Read a simple colon separated list of mappings for the
319        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
320        also adds testbeds to active if they include , active after
321        their name.
322        """
323
324        self.tbmap = { }
325        self.tbactive = set()
326        lineno =0
327        try:
328            f = open(file, "r")
329            for line in f:
330                lineno += 1
331                line = line.strip()
332                if line.startswith('#') or len(line) == 0:
333                    continue
334                try:
335                    label, url = line.split(':', 1)
336                    if ',' in label:
337                        label, act = label.split(',', 1)
338                        active = (act.strip() == 'active')
339                    else:
340                        active = False
341                    self.tbmap[label] = url
342                    if active: self.tbactive.add(label)
343                except ValueError, e:
344                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
345                            "map db: %s %s" % (lineno, line, e))
346        except EnvironmentError, e:
347            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
348                    "open %s: %s" % (file, e))
349        else:
350            f.close()
351
352    def read_store(self):
353        try:
354            self.synch_store = synch_store()
355            self.synch_store.load(self.store_filename)
356            self.log.debug("[read_store]: Read store from %s" % \
357                    self.store_filename)
358        except EnvironmentError, e:
359            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
360                    % (self.state_filename, e))
361            self.synch_store = synch_store()
362
363        # Set the initial permissions on data in the store.  XXX: This ad hoc
364        # authorization attribute initialization is getting out of hand.
365        # XXX: legacy
366        if self.auth_type == 'legacy':
367            for k in self.synch_store.all_keys():
368                try:
369                    if k.startswith('fedid:'):
370                        fid = fedid(hexstr=k[6:46])
371                        if self.state.has_key(fid):
372                            for a in self.get_alloc_ids(self.state[fid]):
373                                self.auth.set_attribute(a, k)
374                except ValueError, e:
375                    self.log.warn("Cannot deduce permissions for %s" % k)
376
377
378    def write_store(self):
379        """
380        Write a new copy of synch_store after writing current state
381        to a backup.  We use the internal synch_store pickle method to avoid
382        incinsistent data.
383
384        State format is a simple pickling of the store.
385        """
386        if os.access(self.store_filename, os.W_OK):
387            copy_file(self.store_filename, \
388                    "%s.bak" % self.store_filename)
389        try:
390            self.synch_store.save(self.store_filename)
391        except EnvironmentError, e:
392            self.log.error("Can't write file %s: %s" % \
393                    (self.store_filename, e))
394        except TypeError, e:
395            self.log.error("Pickling problem (TypeError): %s" % e)
396
397
398    def remove_dirs(self, dir):
399        """
400        Remove the directory tree and all files rooted at dir.  Log any errors,
401        but continue.
402        """
403        self.log.debug("[removedirs]: removing %s" % dir)
404        try:
405            for path, dirs, files in os.walk(dir, topdown=False):
406                for f in files:
407                    os.remove(os.path.join(path, f))
408                for d in dirs:
409                    os.rmdir(os.path.join(path, d))
410            os.rmdir(dir)
411        except EnvironmentError, e:
412            self.log.error("Error deleting directory tree in %s" % e);
413
414    @staticmethod
415    def make_temp_certfile(expcert, tmpdir):
416        """
417        make a protected copy of the access certificate so the experiment
418        controller can act as the experiment principal.  mkstemp is the most
419        secure way to do that. The directory should be created by
420        mkdtemp.  Return the filename.
421        """
422        if expcert and tmpdir:
423            try:
424                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
425                f = os.fdopen(certf, 'w')
426                print >> f, expcert
427                f.close()
428            except EnvironmentError, e:
429                raise service_error(service_error.internal, 
430                        "Cannot create temp cert file?")
431            return certfn
432        else:
433            return None
434
435       
436    def generate_ssh_keys(self, dest, type="rsa" ):
437        """
438        Generate a set of keys for the gateways to use to talk.
439
440        Keys are of type type and are stored in the required dest file.
441        """
442        valid_types = ("rsa", "dsa")
443        t = type.lower();
444        if t not in valid_types: raise ValueError
445        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
446
447        try:
448            trace = open("/dev/null", "w")
449        except EnvironmentError:
450            raise service_error(service_error.internal,
451                    "Cannot open /dev/null??");
452
453        # May raise CalledProcessError
454        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
455        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
456        if rv != 0:
457            raise service_error(service_error.internal, 
458                    "Cannot generate nonce ssh keys.  %s return code %d" \
459                            % (self.ssh_keygen, rv))
460
461    def generate_seer_certs(self, destdir):
462        '''
463        Create a SEER ca cert and a node cert in destdir/ca.pem and
464        destdir/node.pem respectively.  These will be distributed throughout
465        the federated experiment.  This routine reports errors via
466        service_errors.
467        '''
468        openssl = '/usr/bin/openssl'
469        # All the filenames and parameters we need for openssl calls below
470        ca_key =os.path.join(destdir, 'ca.key') 
471        ca_pem = os.path.join(destdir, 'ca.pem')
472        node_key =os.path.join(destdir, 'node.key') 
473        node_pem = os.path.join(destdir, 'node.pem')
474        node_req = os.path.join(destdir, 'node.req')
475        node_signed = os.path.join(destdir, 'node.signed')
476        days = '%s' % (365 * 10)
477        serial = '%s' % random.randint(0, 1<<16)
478
479        try:
480            # Sequence of calls to create a CA key, create a ca cert, create a
481            # node key, node signing request, and finally a signed node
482            # certificate.
483            sequence = (
484                    (openssl, 'genrsa', '-out', ca_key, '1024'),
485                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
486                        ca_pem, '-days', days, '-subj', 
487                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
488                    (openssl, 'genrsa', '-out', node_key, '1024'),
489                    (openssl, 'req', '-new', '-key', node_key, '-out', 
490                        node_req, '-days', days, '-subj', 
491                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
492                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
493                        '-set_serial', serial, '-req', '-in', node_req, 
494                        '-out', node_signed, '-days', days),
495                )
496            # Do all that stuff; bail if there's an error, and push all the
497            # output to dev/null.
498            for cmd in sequence:
499                trace = open("/dev/null", "w")
500                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
501                if rv != 0:
502                    raise service_error(service_error.internal, 
503                            "Cannot generate SEER certs.  %s return code %d" \
504                                    % (' '.join(cmd), rv))
505            # Concatinate the node key and signed certificate into node.pem
506            f = open(node_pem, 'w')
507            for comp in (node_signed, node_key):
508                g = open(comp, 'r')
509                f.write(g.read())
510                g.close()
511            f.close()
512
513            # Throw out intermediaries.
514            for fn in (ca_key, node_key, node_req, node_signed):
515                os.unlink(fn)
516
517        except EnvironmentError, e:
518            # Any difficulties with the file system wind up here
519            raise service_error(service_error.internal,
520                    "File error on  %s while creating SEER certs: %s" % \
521                            (e.filename, e.strerror))
522
523
524
525    def gentopo(self, str):
526        """
527        Generate the topology data structure from the splitter's XML
528        representation of it.
529
530        The topology XML looks like:
531            <experiment>
532                <nodes>
533                    <node><vname></vname><ips>ip1:ip2</ips></node>
534                </nodes>
535                <lans>
536                    <lan>
537                        <vname></vname><vnode></vnode><ip></ip>
538                        <bandwidth></bandwidth><member>node:port</member>
539                    </lan>
540                </lans>
541        """
542        class topo_parse:
543            """
544            Parse the topology XML and create the dats structure.
545            """
546            def __init__(self):
547                # Typing of the subelements for data conversion
548                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
549                self.int_subelements = ( 'bandwidth',)
550                self.float_subelements = ( 'delay',)
551                # The final data structure
552                self.nodes = [ ]
553                self.lans =  [ ]
554                self.topo = { \
555                        'node': self.nodes,\
556                        'lan' : self.lans,\
557                    }
558                self.element = { }  # Current element being created
559                self.chars = ""     # Last text seen
560
561            def end_element(self, name):
562                # After each sub element the contents is added to the current
563                # element or to the appropriate list.
564                if name == 'node':
565                    self.nodes.append(self.element)
566                    self.element = { }
567                elif name == 'lan':
568                    self.lans.append(self.element)
569                    self.element = { }
570                elif name in self.str_subelements:
571                    self.element[name] = self.chars
572                    self.chars = ""
573                elif name in self.int_subelements:
574                    self.element[name] = int(self.chars)
575                    self.chars = ""
576                elif name in self.float_subelements:
577                    self.element[name] = float(self.chars)
578                    self.chars = ""
579
580            def found_chars(self, data):
581                self.chars += data.rstrip()
582
583
584        tp = topo_parse();
585        parser = xml.parsers.expat.ParserCreate()
586        parser.EndElementHandler = tp.end_element
587        parser.CharacterDataHandler = tp.found_chars
588
589        parser.Parse(str)
590
591        return tp.topo
592       
593
594    def genviz(self, topo):
595        """
596        Generate the visualization the virtual topology
597        """
598
599        neato = "/usr/local/bin/neato"
600        # These are used to parse neato output and to create the visualization
601        # file.
602        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
603        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
604                "%s</type></node>"
605
606        try:
607            # Node names
608            nodes = [ n['vname'] for n in topo['node'] ]
609            topo_lans = topo['lan']
610        except KeyError, e:
611            raise service_error(service_error.internal, "Bad topology: %s" %e)
612
613        lans = { }
614        links = { }
615
616        # Walk through the virtual topology, organizing the connections into
617        # 2-node connections (links) and more-than-2-node connections (lans).
618        # When a lan is created, it's added to the list of nodes (there's a
619        # node in the visualization for the lan).
620        for l in topo_lans:
621            if links.has_key(l['vname']):
622                if len(links[l['vname']]) < 2:
623                    links[l['vname']].append(l['vnode'])
624                else:
625                    nodes.append(l['vname'])
626                    lans[l['vname']] = links[l['vname']]
627                    del links[l['vname']]
628                    lans[l['vname']].append(l['vnode'])
629            elif lans.has_key(l['vname']):
630                lans[l['vname']].append(l['vnode'])
631            else:
632                links[l['vname']] = [ l['vnode'] ]
633
634
635        # Open up a temporary file for dot to turn into a visualization
636        try:
637            df, dotname = tempfile.mkstemp()
638            dotfile = os.fdopen(df, 'w')
639        except EnvironmentError:
640            raise service_error(service_error.internal,
641                    "Failed to open file in genviz")
642
643        try:
644            dnull = open('/dev/null', 'w')
645        except EnvironmentError:
646            service_error(service_error.internal,
647                    "Failed to open /dev/null in genviz")
648
649        # Generate a dot/neato input file from the links, nodes and lans
650        try:
651            print >>dotfile, "graph G {"
652            for n in nodes:
653                print >>dotfile, '\t"%s"' % n
654            for l in links.keys():
655                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
656            for l in lans.keys():
657                for n in lans[l]:
658                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
659            print >>dotfile, "}"
660            dotfile.close()
661        except TypeError:
662            raise service_error(service_error.internal,
663                    "Single endpoint link in vtopo")
664        except EnvironmentError:
665            raise service_error(service_error.internal, "Cannot write dot file")
666
667        # Use dot to create a visualization
668        try:
669            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
670                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
671                stderr=dnull, close_fds=True)
672        except EnvironmentError:
673            raise service_error(service_error.internal, 
674                    "Cannot generate visualization: is graphviz available?")
675        dnull.close()
676
677        # Translate dot to vis format
678        vis_nodes = [ ]
679        vis = { 'node': vis_nodes }
680        for line in dot.stdout:
681            m = vis_re.match(line)
682            if m:
683                vn = m.group(1)
684                vis_node = {'name': vn, \
685                        'x': float(m.group(2)),\
686                        'y' : float(m.group(3)),\
687                    }
688                if vn in links.keys() or vn in lans.keys():
689                    vis_node['type'] = 'lan'
690                else:
691                    vis_node['type'] = 'node'
692                vis_nodes.append(vis_node)
693        rv = dot.wait()
694
695        os.remove(dotname)
696        # XXX: graphviz seems to use low return codes for warnings, like
697        # "couldn't find font"
698        if rv < 2 : return vis
699        else: return None
700
701
702    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
703            cert_pwd=None):
704        """
705        Release access to testbed through fedd
706        """
707
708        if not uri and tbmap:
709            uri = tbmap.get(tb, None)
710        if not uri:
711            raise service_error(service_error.server_config, 
712                    "Unknown testbed: %s" % tb)
713
714        if self.local_access.has_key(uri):
715            resp = self.local_access[uri].ReleaseAccess(\
716                    { 'ReleaseAccessRequestBody' : 
717                        {'allocID': {'fedid': aid}},}, 
718                    fedid(file=cert_file))
719            resp = { 'ReleaseAccessResponseBody': resp } 
720        else:
721            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
722                    cert_file, cert_pwd, self.trusted_certs)
723
724        # better error coding
725
726    def remote_ns2topdl(self, uri, desc):
727
728        req = {
729                'description' : { 'ns2description': desc },
730            }
731
732        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
733                self.trusted_certs)
734
735        if r.has_key('Ns2TopdlResponseBody'):
736            r = r['Ns2TopdlResponseBody']
737            ed = r.get('experimentdescription', None)
738            if ed.has_key('topdldescription'):
739                return topdl.Topology(**ed['topdldescription'])
740            else:
741                raise service_error(service_error.protocol, 
742                        "Bad splitter response (no output)")
743        else:
744            raise service_error(service_error.protocol, "Bad splitter response")
745
746    class start_segment:
747        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
748                cert_pwd=None, trusted_certs=None, caller=None,
749                log_collector=None):
750            self.log = log
751            self.debug = debug
752            self.cert_file = cert_file
753            self.cert_pwd = cert_pwd
754            self.trusted_certs = None
755            self.caller = caller
756            self.testbed = testbed
757            self.log_collector = log_collector
758            self.response = None
759            self.node = { }
760            self.subs = { }
761            self.proof = None
762
763        def make_map(self, resp):
764            if 'segmentdescription' not in resp  or \
765                    'topdldescription' not in resp['segmentdescription']:
766                self.log.warn('No topology returned from startsegment')
767                return 
768
769            top = topdl.Topology(
770                    **resp['segmentdescription']['topdldescription'])
771
772            for e in top.elements:
773                if isinstance(e, topdl.Computer):
774                    self.node[e.name] = e
775            for s in top.substrates:
776                self.subs[s.name] = s
777
778        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
779            req = {
780                    'allocID': { 'fedid' : aid }, 
781                    'segmentdescription': { 
782                        'topdldescription': topo.to_dict(),
783                    },
784                }
785
786            if connInfo:
787                req['connection'] = connInfo
788
789            import_svcs = [ s for m in masters.values() \
790                    for s in m if self.testbed in s.importers]
791
792            if import_svcs or self.testbed in masters:
793                req['service'] = []
794
795            for s in import_svcs:
796                for r in s.reqs:
797                    sr = copy.deepcopy(r)
798                    sr['visibility'] = 'import';
799                    req['service'].append(sr)
800
801            for s in masters.get(self.testbed, []):
802                for r in s.reqs:
803                    sr = copy.deepcopy(r)
804                    sr['visibility'] = 'export';
805                    req['service'].append(sr)
806
807            if attrs:
808                req['fedAttr'] = attrs
809
810            try:
811                self.log.debug("Calling StartSegment at %s " % uri)
812                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
813                        self.trusted_certs)
814                if r.has_key('StartSegmentResponseBody'):
815                    lval = r['StartSegmentResponseBody'].get('allocationLog',
816                            None)
817                    if lval and self.log_collector:
818                        for line in  lval.splitlines(True):
819                            self.log_collector.write(line)
820                    self.make_map(r['StartSegmentResponseBody'])
821                    if 'proof' in r: self.proof = r['proof']
822                    self.response = r
823                else:
824                    raise service_error(service_error.internal, 
825                            "Bad response!?: %s" %r)
826                return True
827            except service_error, e:
828                self.log.error("Start segment failed on %s: %s" % \
829                        (self.testbed, e))
830                return False
831
832
833
834    class terminate_segment:
835        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
836                cert_pwd=None, trusted_certs=None, caller=None):
837            self.log = log
838            self.debug = debug
839            self.cert_file = cert_file
840            self.cert_pwd = cert_pwd
841            self.trusted_certs = None
842            self.caller = caller
843            self.testbed = testbed
844
845        def __call__(self, uri, aid ):
846            req = {
847                    'allocID': {'fedid': aid }, 
848                }
849            self.log.info("Calling terminate segment")
850            try:
851                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
852                        self.trusted_certs)
853                self.log.info("Terminate segment succeeded")
854                return True
855            except service_error, e:
856                self.log.error("Terminate segment failed on %s: %s" % \
857                        (self.testbed, e))
858                return False
859
860    class info_segment(start_segment):
861        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
862                cert_pwd=None, trusted_certs=None, caller=None,
863                log_collector=None):
864            experiment_control_local.start_segment.__init__(self, debug, 
865                    log, testbed, cert_file, cert_pwd, trusted_certs, 
866                    caller, log_collector)
867
868        def __call__(self, uri, aid):
869            req = { 'allocID': { 'fedid' : aid } }
870
871            try:
872                self.log.debug("Calling InfoSegment at %s " % uri)
873                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
874                        self.trusted_certs)
875                if r.has_key('InfoSegmentResponseBody'):
876                    self.make_map(r['InfoSegmentResponseBody'])
877                    if 'proof' in r: self.proof = r['proof']
878                    self.response = r
879                else:
880                    raise service_error(service_error.internal, 
881                            "Bad response!?: %s" %r)
882                return True
883            except service_error, e:
884                self.log.error("Info segment failed on %s: %s" % \
885                        (self.testbed, e))
886                return False
887
888    class operation_segment:
889        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
890                cert_pwd=None, trusted_certs=None, caller=None,
891                log_collector=None):
892            self.log = log
893            self.debug = debug
894            self.cert_file = cert_file
895            self.cert_pwd = cert_pwd
896            self.trusted_certs = None
897            self.caller = caller
898            self.testbed = testbed
899            self.status = None
900
901        def __call__(self, uri, aid, op, targets, params):
902            req = { 
903                    'allocID': { 'fedid' : aid },
904                    'operation': op,
905                    'target': targets,
906                    }
907            if params: req['parameter'] = params
908
909
910            try:
911                self.log.debug("Calling OperationSegment at %s " % uri)
912                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
913                        self.trusted_certs)
914                if 'OperationSegmentResponseBody' in r:
915                    r = r['OperationSegmentResponseBody']
916                    if 'status' in r:
917                        self.status = r['status']
918                else:
919                    raise service_error(service_error.internal, 
920                            "Bad response!?: %s" %r)
921                return True
922            except service_error, e:
923                self.log.error("Operation segment failed on %s: %s" % \
924                        (self.testbed, e))
925                return False
926
927    def annotate_topology(self, top, data):
928        # These routines do various parts of the annotation
929        def add_new_names(nl, l):
930            """ add any names in nl to the list in l """
931            for n in nl:
932                if n not in l: l.append(n)
933       
934        def merge_services(ne, e):
935            for ns in ne.service:
936                # NB: the else is on the for
937                for s in e.service:
938                    if ns.name == s.name:
939                        s.importer = ns.importer
940                        s.param = ns.param
941                        s.description = ns.description
942                        s.status = ns.status
943                        break
944                else:
945                    e.service.append(ns)
946       
947        def merge_oses(ne, e):
948            """
949            Merge the operating system entries of ne into e
950            """
951            for nos in ne.os:
952                # NB: the else is on the for
953                for os in e.os:
954                    if nos.name == os.name:
955                        os.version = nos.version
956                        os.version = nos.distribution
957                        os.version = nos.distributionversion
958                        for a in nos.attribute:
959                            if os.get_attribute(a.attribute):
960                                os.remove_attribute(a.attribute)
961                            os.set_attribute(a.attribute, a.value)
962                        break
963                else:
964                    # If both nodes have one OS, this is a replacement
965                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
966                    else: e.os.append(nos)
967
968        # Annotate the topology with embedding info
969        for e in top.elements:
970            if isinstance(e, topdl.Computer):
971                for s in data:
972                    ne = s.node.get(e.name, None)
973                    if ne is not None:
974                        add_new_names(ne.localname, e.localname)
975                        e.status = ne.status
976                        merge_services(ne, e)
977                        add_new_names(ne.operation, e.operation)
978                        if ne.os: merge_oses(ne, e)
979                        break
980        # Annotate substrates
981        for s in top.substrates:
982            for d in data:
983                ss = d.subs.get(s.name, None)
984                if ss is not None:
985                    if ss.capacity is not None:
986                        s.capacity = ss.capacity
987                    if s.latency is not None:
988                        s.latency = ss.latency
989
990
991
992    def allocate_resources(self, allocated, masters, eid, expid, 
993            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
994            attrs=None, connInfo={}, tbmap=None, expcert=None):
995
996        started = { }           # Testbeds where a sub-experiment started
997                                # successfully
998
999        # XXX
1000        fail_soft = False
1001
1002        if tbmap is None: tbmap = { }
1003
1004        log = alloc_log or self.log
1005
1006        tp = thread_pool(self.nthreads)
1007        threads = [ ]
1008        starters = [ ]
1009
1010        if expcert:
1011            cert = expcert
1012            pw = None
1013        else:
1014            cert = self.cert_file
1015            pw = self.cert_pwd
1016
1017        for tb in allocated.keys():
1018            # Create and start a thread to start the segment, and save it
1019            # to get the return value later
1020            tb_attrs = copy.copy(attrs)
1021            tp.wait_for_slot()
1022            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1023            base, suffix = split_testbed(tb)
1024            if suffix:
1025                tb_attrs.append({'attribute': 'experiment_name', 
1026                    'value': "%s-%s" % (eid, suffix)})
1027            else:
1028                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1029            if not uri:
1030                raise service_error(service_error.internal, 
1031                        "Unknown testbed %s !?" % tb)
1032
1033            aid = tbparams[tb].allocID
1034            if not aid:
1035                raise service_error(service_error.internal, 
1036                        "No alloc id for testbed %s !?" % tb)
1037
1038            s = self.start_segment(log=log, debug=self.debug,
1039                    testbed=tb, cert_file=cert,
1040                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1041                    caller=self.call_StartSegment,
1042                    log_collector=log_collector)
1043            starters.append(s)
1044            t  = pooled_thread(\
1045                    target=s, name=tb,
1046                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1047                    pdata=tp, trace_file=self.trace_file)
1048            threads.append(t)
1049            t.start()
1050
1051        # Wait until all finish (keep pinging the log, though)
1052        mins = 0
1053        revoked = False
1054        while not tp.wait_for_all_done(60.0):
1055            mins += 1
1056            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1057                    % mins)
1058            if not revoked and \
1059                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1060                # a testbed has failed.  Revoke this experiment's
1061                # synchronizarion values so that sub experiments will not
1062                # deadlock waiting for synchronization that will never happen
1063                self.log.info("A subexperiment has failed to swap in, " + \
1064                        "revoking synch keys")
1065                var_key = "fedid:%s" % expid
1066                for k in self.synch_store.all_keys():
1067                    if len(k) > 45 and k[0:46] == var_key:
1068                        self.synch_store.revoke_key(k)
1069                revoked = True
1070
1071        failed = [ t.getName() for t in threads if not t.rv ]
1072        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1073
1074        # If one failed clean up, unless fail_soft is set
1075        if failed:
1076            if not fail_soft:
1077                tp.clear()
1078                for tb in succeeded:
1079                    # Create and start a thread to stop the segment
1080                    tp.wait_for_slot()
1081                    uri = tbparams[tb].uri
1082                    t  = pooled_thread(\
1083                            target=self.terminate_segment(log=log,
1084                                testbed=tb,
1085                                cert_file=cert, 
1086                                cert_pwd=pw,
1087                                trusted_certs=self.trusted_certs,
1088                                caller=self.call_TerminateSegment),
1089                            args=(uri, tbparams[tb].allocID),
1090                            name=tb,
1091                            pdata=tp, trace_file=self.trace_file)
1092                    t.start()
1093                # Wait until all finish (if any are being stopped)
1094                if succeeded:
1095                    tp.wait_for_all_done()
1096
1097                # release the allocations
1098                for tb in tbparams.keys():
1099                    try:
1100                        self.release_access(tb, tbparams[tb].allocID, 
1101                                tbmap=tbmap, uri=tbparams[tb].uri,
1102                                cert_file=cert, cert_pwd=pw)
1103                    except service_error, e:
1104                        self.log.warn("Error releasing access: %s" % e.desc)
1105                # Remove the placeholder
1106                self.state_lock.acquire()
1107                self.state[eid].status = 'failed'
1108                self.state[eid].updated()
1109                if self.state_filename: self.write_state()
1110                self.state_lock.release()
1111                # Remove the repo dir
1112                self.remove_dirs("%s/%s" %(self.repodir, expid))
1113                # Walk up tmpdir, deleting as we go
1114                if self.cleanup:
1115                    self.remove_dirs(tmpdir)
1116                else:
1117                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1118
1119
1120                log.error("Swap in failed on %s" % ",".join(failed))
1121                return
1122        else:
1123            # Walk through the successes and gather the proofs
1124            proofs = { }
1125            for s in starters:
1126                if s.proof:
1127                    proofs[s.testbed] = s.proof
1128            self.annotate_topology(top, starters)
1129            log.info("[start_segment]: Experiment %s active" % eid)
1130
1131
1132        # Walk up tmpdir, deleting as we go
1133        if self.cleanup:
1134            self.remove_dirs(tmpdir)
1135        else:
1136            log.debug("[start_experiment]: not removing %s" % tmpdir)
1137
1138        # Insert the experiment into our state and update the disk copy.
1139        self.state_lock.acquire()
1140        self.state[expid].status = 'active'
1141        self.state[eid] = self.state[expid]
1142        self.state[eid].top = top
1143        self.state[eid].updated()
1144        # Append startup proofs
1145        for f in self.state[eid].get_all_allocations():
1146            if f.tb in proofs:
1147                f.proof.append(proofs[f.tb])
1148
1149        if self.state_filename: self.write_state()
1150        self.state_lock.release()
1151        return
1152
1153
1154    def add_kit(self, e, kit):
1155        """
1156        Add a Software object created from the list of (install, location)
1157        tuples passed as kit  to the software attribute of an object e.  We
1158        do this enough to break out the code, but it's kind of a hack to
1159        avoid changing the old tuple rep.
1160        """
1161
1162        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1163
1164        if isinstance(e.software, list): e.software.extend(s)
1165        else: e.software = s
1166
1167    def append_experiment_authorization(self, expid, attrs, 
1168            need_state_lock=True):
1169        """
1170        Append the authorization information to system state
1171        """
1172
1173        for p, a in attrs:
1174            self.auth.set_attribute(p, a)
1175        self.auth.save()
1176
1177        if need_state_lock: self.state_lock.acquire()
1178        # XXX: really a no op?
1179        #self.state[expid]['auth'].update(attrs)
1180        if self.state_filename: self.write_state()
1181        if need_state_lock: self.state_lock.release()
1182
1183    def clear_experiment_authorization(self, expid, need_state_lock=True):
1184        """
1185        Attrs is a set of attribute principal pairs that need to be removed
1186        from the authenticator.  Remove them and save the authenticator.
1187        """
1188
1189        if need_state_lock: self.state_lock.acquire()
1190        # XXX: should be a no-op
1191        #if expid in self.state and 'auth' in self.state[expid]:
1192            #for p, a in self.state[expid]['auth']:
1193                #self.auth.unset_attribute(p, a)
1194            #self.state[expid]['auth'] = set()
1195        if self.state_filename: self.write_state()
1196        if need_state_lock: self.state_lock.release()
1197        self.auth.save()
1198
1199
1200    def create_experiment_state(self, fid, req, expid, expcert,
1201            state='starting'):
1202        """
1203        Create the initial entry in the experiment's state.  The expid and
1204        expcert are the experiment's fedid and certifacte that represents that
1205        ID, which are installed in the experiment state.  If the request
1206        includes a suggested local name that is used if possible.  If the local
1207        name is already taken by an experiment owned by this user that has
1208        failed, it is overwritten.  Otherwise new letters are added until a
1209        valid localname is found.  The generated local name is returned.
1210        """
1211
1212        if req.has_key('experimentID') and \
1213                req['experimentID'].has_key('localname'):
1214            overwrite = False
1215            eid = req['experimentID']['localname']
1216            # If there's an old failed experiment here with the same local name
1217            # and accessible by this user, we'll overwrite it, otherwise we'll
1218            # fall through and do the collision avoidance.
1219            old_expid = self.get_experiment_fedid(eid)
1220            if old_expid:
1221                users_experiment = True
1222                try:
1223                    self.check_experiment_access(fid, old_expid)
1224                except service_error, e:
1225                    if e.code == service_error.access: users_experiment = False
1226                    else: raise e
1227                if users_experiment:
1228                    self.state_lock.acquire()
1229                    status = self.state[eid].status
1230                    if status and status == 'failed':
1231                        # remove the old access attributes
1232                        self.clear_experiment_authorization(eid,
1233                                need_state_lock=False)
1234                        overwrite = True
1235                        del self.state[eid]
1236                        del self.state[old_expid]
1237                    self.state_lock.release()
1238                else:
1239                    self.log.info('Experiment %s exists, ' % eid + \
1240                            'but this user cannot access it')
1241            self.state_lock.acquire()
1242            while (self.state.has_key(eid) and not overwrite):
1243                eid += random.choice(string.ascii_letters)
1244            # Initial state
1245            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1246                    identity=expcert)
1247            self.state[expid] = self.state[eid]
1248            if self.state_filename: self.write_state()
1249            self.state_lock.release()
1250        else:
1251            eid = self.exp_stem
1252            for i in range(0,5):
1253                eid += random.choice(string.ascii_letters)
1254            self.state_lock.acquire()
1255            while (self.state.has_key(eid)):
1256                eid = self.exp_stem
1257                for i in range(0,5):
1258                    eid += random.choice(string.ascii_letters)
1259            # Initial state
1260            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1261                    identity=expcert)
1262            self.state[expid] = self.state[eid]
1263            if self.state_filename: self.write_state()
1264            self.state_lock.release()
1265
1266        # Let users touch the state.  Authorize this fid and the expid itself
1267        # to touch the experiment, as well as allowing th eoverrides.
1268        self.append_experiment_authorization(eid, 
1269                set([(fid, expid), (expid,expid)] + \
1270                        [ (o, expid) for o in self.overrides]))
1271
1272        return eid
1273
1274
1275    def allocate_ips_to_topo(self, top):
1276        """
1277        Add an ip4_address attribute to all the hosts in the topology, based on
1278        the shared substrates on which they sit.  An /etc/hosts file is also
1279        created and returned as a list of hostfiles entries.  We also return
1280        the allocator, because we may need to allocate IPs to portals
1281        (specifically DRAGON portals).
1282        """
1283        subs = sorted(top.substrates, 
1284                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1285                reverse=True)
1286        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1287        ifs = { }
1288        hosts = [ ]
1289
1290        for idx, s in enumerate(subs):
1291            net_size = len(s.interfaces)+2
1292
1293            a = ips.allocate(net_size)
1294            if a :
1295                base, num = a
1296                if num < net_size: 
1297                    raise service_error(service_error.internal,
1298                            "Allocator returned wrong number of IPs??")
1299            else:
1300                raise service_error(service_error.req, 
1301                        "Cannot allocate IP addresses")
1302            mask = ips.min_alloc
1303            while mask < net_size:
1304                mask *= 2
1305
1306            netmask = ((2**32-1) ^ (mask-1))
1307
1308            base += 1
1309            for i in s.interfaces:
1310                i.attribute.append(
1311                        topdl.Attribute('ip4_address', 
1312                            "%s" % ip_addr(base)))
1313                i.attribute.append(
1314                        topdl.Attribute('ip4_netmask', 
1315                            "%s" % ip_addr(int(netmask))))
1316
1317                hname = i.element.name
1318                if ifs.has_key(hname):
1319                    hosts.append("%s\t%s-%s %s-%d" % \
1320                            (ip_addr(base), hname, s.name, hname,
1321                                ifs[hname]))
1322                else:
1323                    ifs[hname] = 0
1324                    hosts.append("%s\t%s-%s %s-%d %s" % \
1325                            (ip_addr(base), hname, s.name, hname,
1326                                ifs[hname], hname))
1327
1328                ifs[hname] += 1
1329                base += 1
1330        return hosts, ips
1331
1332    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1333            tbparam, masters, tbmap, expid=None, expcert=None):
1334        for tb in testbeds:
1335            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1336                    expcert)
1337            allocated[tb] = 1
1338
1339    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1340            expcert=None):
1341        """
1342        Get access to testbed through fedd and set the parameters for that tb
1343        """
1344        def get_export_project(svcs):
1345            """
1346            Look through for the list of federated_service for this testbed
1347            objects for a project_export service, and extract the project
1348            parameter.
1349            """
1350
1351            pe = [s for s in svcs if s.name=='project_export']
1352            if len(pe) == 1:
1353                return pe[0].params.get('project', None)
1354            elif len(pe) == 0:
1355                return None
1356            else:
1357                raise service_error(service_error.req,
1358                        "More than one project export is not supported")
1359
1360        def add_services(svcs, type, slist, keys):
1361            """
1362            Add the given services to slist.  type is import or export.  Also
1363            add a mapping entry from the assigned id to the original service
1364            record.
1365            """
1366            for i, s in enumerate(svcs):
1367                idx = '%s%d' % (type, i)
1368                keys[idx] = s
1369                sr = {'id': idx, 'name': s.name, 'visibility': type }
1370                if s.params:
1371                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1372                            for k, v in s.params.items()]
1373                slist.append(sr)
1374
1375        uri = tbmap.get(testbed_base(tb), None)
1376        if not uri:
1377            raise service_error(service_error.server_config, 
1378                    "Unknown testbed: %s" % tb)
1379
1380        export_svcs = masters.get(tb,[])
1381        import_svcs = [ s for m in masters.values() \
1382                for s in m \
1383                    if tb in s.importers ]
1384
1385        export_project = get_export_project(export_svcs)
1386        # Compose the credential list so that IDs come before attributes
1387        creds = set()
1388        keys = set()
1389        certs = self.auth.get_creds_for_principal(fid)
1390        # Append credenials about this experiment controller - e.g. that it is
1391        # trusted.
1392        certs.update(self.auth.get_creds_for_principal(
1393            fedid(file=self.cert_file)))
1394        if expid:
1395            certs.update(self.auth.get_creds_for_principal(expid))
1396        for c in certs:
1397            keys.add(c.issuer_cert())
1398            creds.add(c.attribute_cert())
1399        creds = list(keys) + list(creds)
1400
1401        if expcert: cert, pw = expcert, None
1402        else: cert, pw = self.cert_file, self.cert_pw
1403
1404        # Request credentials
1405        req = {
1406                'abac_credential': creds,
1407            }
1408        # Make the service request from the services we're importing and
1409        # exporting.  Keep track of the export request ids so we can
1410        # collect the resulting info from the access response.
1411        e_keys = { }
1412        if import_svcs or export_svcs:
1413            slist = []
1414            add_services(import_svcs, 'import', slist, e_keys)
1415            add_services(export_svcs, 'export', slist, e_keys)
1416            req['service'] = slist
1417
1418        if self.local_access.has_key(uri):
1419            # Local access call
1420            req = { 'RequestAccessRequestBody' : req }
1421            r = self.local_access[uri].RequestAccess(req, 
1422                    fedid(file=self.cert_file))
1423            r = { 'RequestAccessResponseBody' : r }
1424        else:
1425            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1426
1427        if r.has_key('RequestAccessResponseBody'):
1428            # Through to here we have a valid response, not a fault.
1429            # Access denied is a fault, so something better or worse than
1430            # access denied has happened.
1431            r = r['RequestAccessResponseBody']
1432            self.log.debug("[get_access] Access granted")
1433        else:
1434            raise service_error(service_error.protocol,
1435                        "Bad proxy response")
1436        if 'proof' not in r:
1437            raise service_error(service_error.protocol,
1438                        "Bad access response (no access proof)")
1439
1440        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1441                tb=tb, uri=uri, proof=[r['proof']], 
1442                services=masters.get(tb, None))
1443
1444        # Collect the responses corresponding to the services this testbed
1445        # exports.  These will be the service requests that we will include in
1446        # the start segment requests (with appropriate visibility values) to
1447        # import and export the segments.
1448        for s in r.get('service', []):
1449            id = s.get('id', None)
1450            # Note that this attaches the response to the object in the masters
1451            # data structure.  (The e_keys index disappears when this fcn
1452            # returns)
1453            if id and id in e_keys:
1454                e_keys[id].reqs.append(s)
1455
1456        # Add attributes to parameter space.  We don't allow attributes to
1457        # overlay any parameters already installed.
1458        for a in r.get('fedAttr', []):
1459            try:
1460                if a['attribute']:
1461                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1462            except KeyError:
1463                self.log.error("Bad attribute in response: %s" % a)
1464
1465
1466    def split_topology(self, top, topo, testbeds):
1467        """
1468        Create the sub-topologies that are needed for experiment instantiation.
1469        """
1470        for tb in testbeds:
1471            topo[tb] = top.clone()
1472            # copy in for loop allows deletions from the original
1473            for e in [ e for e in topo[tb].elements]:
1474                etb = e.get_attribute('testbed')
1475                # NB: elements without a testbed attribute won't appear in any
1476                # sub topologies. 
1477                if not etb or etb != tb:
1478                    for i in e.interface:
1479                        for s in i.subs:
1480                            try:
1481                                s.interfaces.remove(i)
1482                            except ValueError:
1483                                raise service_error(service_error.internal,
1484                                        "Can't remove interface??")
1485                    topo[tb].elements.remove(e)
1486            topo[tb].make_indices()
1487
1488    def confirm_software(self, top):
1489        """
1490        Make sure that the software to be loaded in the topo is all available
1491        before we begin making access requests, etc.  This is a subset of
1492        wrangle_software.
1493        """
1494        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1495        pkgs.update([x.location for e in top.elements for x in e.software])
1496
1497        for pkg in pkgs:
1498            loc = pkg
1499
1500            scheme, host, path = urlparse(loc)[0:3]
1501            dest = os.path.basename(path)
1502            if not scheme:
1503                if not loc.startswith('/'):
1504                    loc = "/%s" % loc
1505                loc = "file://%s" %loc
1506            # NB: if scheme was found, loc == pkg
1507            try:
1508                u = urlopen(loc)
1509                u.close()
1510            except Exception, e:
1511                raise service_error(service_error.req, 
1512                        "Cannot open %s: %s" % (loc, e))
1513        return True
1514
1515    def wrangle_software(self, expid, top, topo, tbparams):
1516        """
1517        Copy software out to the repository directory, allocate permissions and
1518        rewrite the segment topologies to look for the software in local
1519        places.
1520        """
1521
1522        # Copy the rpms and tarfiles to a distribution directory from
1523        # which the federants can retrieve them
1524        linkpath = "%s/software" %  expid
1525        softdir ="%s/%s" % ( self.repodir, linkpath)
1526        softmap = { }
1527
1528        # self.fedkit and self.gateway kit are lists of tuples of
1529        # (install_location, download_location) this extracts the download
1530        # locations.
1531        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1532        pkgs.update([x.location for e in top.elements for x in e.software])
1533        try:
1534            os.makedirs(softdir)
1535        except EnvironmentError, e:
1536            raise service_error(
1537                    "Cannot create software directory: %s" % e)
1538        # The actual copying.  Everything's converted into a url for copying.
1539        auth_attrs = set()
1540        for pkg in pkgs:
1541            loc = pkg
1542
1543            scheme, host, path = urlparse(loc)[0:3]
1544            dest = os.path.basename(path)
1545            if not scheme:
1546                if not loc.startswith('/'):
1547                    loc = "/%s" % loc
1548                loc = "file://%s" %loc
1549            # NB: if scheme was found, loc == pkg
1550            try:
1551                u = urlopen(loc)
1552            except Exception, e:
1553                raise service_error(service_error.req, 
1554                        "Cannot open %s: %s" % (loc, e))
1555            try:
1556                f = open("%s/%s" % (softdir, dest) , "w")
1557                self.log.debug("Writing %s/%s" % (softdir,dest) )
1558                data = u.read(4096)
1559                while data:
1560                    f.write(data)
1561                    data = u.read(4096)
1562                f.close()
1563                u.close()
1564            except Exception, e:
1565                raise service_error(service_error.internal,
1566                        "Could not copy %s: %s" % (loc, e))
1567            path = re.sub("/tmp", "", linkpath)
1568            # XXX
1569            softmap[pkg] = \
1570                    "%s/%s/%s" %\
1571                    ( self.repo_url, path, dest)
1572
1573            # Allow the individual segments to access the software by assigning
1574            # an attribute to each testbed allocation that encodes the data to
1575            # be released.  This expression collects the data for each run of
1576            # the loop.
1577            auth_attrs.update([
1578                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1579                        for tb in tbparams.keys()])
1580
1581        self.append_experiment_authorization(expid, auth_attrs)
1582
1583        # Convert the software locations in the segments into the local
1584        # copies on this host
1585        for soft in [ s for tb in topo.values() \
1586                for e in tb.elements \
1587                    if getattr(e, 'software', False) \
1588                        for s in e.software ]:
1589            if softmap.has_key(soft.location):
1590                soft.location = softmap[soft.location]
1591
1592
1593    def new_experiment(self, req, fid):
1594        """
1595        The external interface to empty initial experiment creation called from
1596        the dispatcher.
1597
1598        Creates a working directory, splits the incoming description using the
1599        splitter script and parses out the avrious subsections using the
1600        lcasses above.  Once each sub-experiment is created, use pooled threads
1601        to instantiate them and start it all up.
1602        """
1603        self.log.info("New experiment call started for %s" % fid)
1604        req = req.get('NewRequestBody', None)
1605        if not req:
1606            raise service_error(service_error.req,
1607                    "Bad request format (no NewRequestBody)")
1608
1609        if self.auth.import_credentials(data_list=req.get('credential', [])):
1610            self.auth.save()
1611
1612        try:
1613            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1614                    with_proof=True)
1615        except service_error, e:
1616            self.log.info("New experiment call for %s: access denied" % fid)
1617            raise e
1618
1619
1620        if not access_ok:
1621            self.log.info("New experiment call for %s: Access denied" % fid)
1622            raise service_error(service_error.access, "New access denied",
1623                    proof=[proof])
1624
1625        try:
1626            tmpdir = tempfile.mkdtemp(prefix="split-")
1627        except EnvironmentError:
1628            raise service_error(service_error.internal, "Cannot create tmp dir")
1629
1630        # Generate an ID for the experiment (slice) and a certificate that the
1631        # allocator can use to prove they own it.  We'll ship it back through
1632        # the encrypted connection.  If the requester supplied one, use it.
1633        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1634            expcert = req['experimentAccess']['X509']
1635            expid = fedid(certstr=expcert)
1636            self.state_lock.acquire()
1637            if expid in self.state:
1638                self.state_lock.release()
1639                raise service_error(service_error.req, 
1640                        'fedid %s identifies an existing experiment' % expid)
1641            self.state_lock.release()
1642        else:
1643            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1644
1645        #now we're done with the tmpdir, and it should be empty
1646        if self.cleanup:
1647            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1648            os.rmdir(tmpdir)
1649        else:
1650            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1651
1652        eid = self.create_experiment_state(fid, req, expid, expcert, 
1653                state='empty')
1654
1655        rv = {
1656                'experimentID': [
1657                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1658                ],
1659                'experimentStatus': 'empty',
1660                'experimentAccess': { 'X509' : expcert },
1661                'proof': proof.to_dict(),
1662            }
1663
1664        self.log.info("New experiment call succeeded for %s" % fid)
1665        return rv
1666
1667    # create_experiment sub-functions
1668
1669    @staticmethod
1670    def get_experiment_key(req, field='experimentID'):
1671        """
1672        Parse the experiment identifiers out of the request (the request body
1673        tag has been removed).  Specifically this pulls either the fedid or the
1674        localname out of the experimentID field.  A fedid is preferred.  If
1675        neither is present or the request does not contain the fields,
1676        service_errors are raised.
1677        """
1678        # Get the experiment access
1679        exp = req.get(field, None)
1680        if exp:
1681            if exp.has_key('fedid'):
1682                key = exp['fedid']
1683            elif exp.has_key('localname'):
1684                key = exp['localname']
1685            else:
1686                raise service_error(service_error.req, "Unknown lookup type")
1687        else:
1688            raise service_error(service_error.req, "No request?")
1689
1690        return key
1691
1692    def get_experiment_ids_and_start(self, key, tmpdir):
1693        """
1694        Get the experiment name, id and access certificate from the state, and
1695        set the experiment state to 'starting'.  returns a triple (fedid,
1696        localname, access_cert_file). The access_cert_file is a copy of the
1697        contents of the access certificate, created in the tempdir with
1698        restricted permissions.  If things are confused, raise an exception.
1699        """
1700
1701        expid = eid = None
1702        self.state_lock.acquire()
1703        if key in self.state:
1704            exp = self.state[key]
1705            exp.status = "starting"
1706            exp.updated()
1707            expid = exp.fedid
1708            eid = exp.localname
1709            expcert = exp.identity
1710        self.state_lock.release()
1711
1712        # make a protected copy of the access certificate so the experiment
1713        # controller can act as the experiment principal.
1714        if expcert:
1715            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1716            if not expcert_file:
1717                raise service_error(service_error.internal, 
1718                        "Cannot create temp cert file?")
1719        else:
1720            expcert_file = None
1721
1722        return (eid, expid, expcert_file)
1723
1724    def get_topology(self, req, tmpdir):
1725        """
1726        Get the ns2 content and put it into a file for parsing.  Call the local
1727        or remote parser and return the topdl.Topology.  Errors result in
1728        exceptions.  req is the request and tmpdir is a work directory.
1729        """
1730
1731        # The tcl parser needs to read a file so put the content into that file
1732        descr=req.get('experimentdescription', None)
1733        if descr:
1734            if 'ns2description' in descr:
1735                file_content=descr['ns2description']
1736            elif 'topdldescription' in descr:
1737                return topdl.Topology(**descr['topdldescription'])
1738            else:
1739                raise service_error(service_error.req, 
1740                        'Unknown experiment description type')
1741        else:
1742            raise service_error(service_error.req, "No experiment description")
1743
1744
1745        if self.splitter_url:
1746            self.log.debug("Calling remote topdl translator at %s" % \
1747                    self.splitter_url)
1748            top = self.remote_ns2topdl(self.splitter_url, file_content)
1749        else:
1750            tclfile = os.path.join(tmpdir, "experiment.tcl")
1751            if file_content:
1752                try:
1753                    f = open(tclfile, 'w')
1754                    f.write(file_content)
1755                    f.close()
1756                except EnvironmentError:
1757                    raise service_error(service_error.internal,
1758                            "Cannot write temp experiment description")
1759            else:
1760                raise service_error(service_error.req, 
1761                        "Only ns2descriptions supported")
1762            pid = "dummy"
1763            gid = "dummy"
1764            eid = "dummy"
1765
1766            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1767                str(self.muxmax), '-m', 'dummy']
1768
1769            tclcmd.extend([pid, gid, eid, tclfile])
1770
1771            self.log.debug("running local splitter %s", " ".join(tclcmd))
1772            # This is just fantastic.  As a side effect the parser copies
1773            # tb_compat.tcl into the current directory, so that directory
1774            # must be writable by the fedd user.  Doing this in the
1775            # temporary subdir ensures this is the case.
1776            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1777                    cwd=tmpdir)
1778            split_data = tclparser.stdout
1779
1780            top = topdl.topology_from_xml(file=split_data, top="experiment")
1781            os.remove(tclfile)
1782
1783        return top
1784
1785    def get_testbed_services(self, req, testbeds):
1786        """
1787        Parse the services section of the request into two dicts mapping
1788        testbed to lists of federated_service objects.  The first dict maps all
1789        exporters of services to those service objects, the second maps
1790        testbeds to service objects only for services requiring portals.
1791        """
1792        # We construct both dicts here because deriving the second is more
1793        # comples than it looks - both the keys and lists can differ, and it's
1794        # much easier to generate both in one pass.
1795        masters = { }
1796        pmasters = { }
1797        for s in req.get('service', []):
1798            # If this is a service request with the importall field
1799            # set, fill it out.
1800
1801            if s.get('importall', False):
1802                s['import'] = [ tb for tb in testbeds \
1803                        if tb not in s.get('export',[])]
1804                del s['importall']
1805
1806            # Add the service to masters
1807            for tb in s.get('export', []):
1808                if s.get('name', None):
1809
1810                    params = { }
1811                    for a in s.get('fedAttr', []):
1812                        params[a.get('attribute', '')] = a.get('value','')
1813
1814                    fser = federated_service(name=s['name'],
1815                            exporter=tb, importers=s.get('import',[]),
1816                            params=params)
1817                    if fser.name == 'hide_hosts' \
1818                            and 'hosts' not in fser.params:
1819                        fser.params['hosts'] = \
1820                                ",".join(tb_hosts.get(fser.exporter, []))
1821                    if tb in masters: masters[tb].append(fser)
1822                    else: masters[tb] = [fser]
1823
1824                    if fser.portal:
1825                        if tb in pmasters: pmasters[tb].append(fser)
1826                        else: pmasters[tb] = [fser]
1827                else:
1828                    self.log.error('Testbed service does not have name " + \
1829                            "and importers')
1830        return masters, pmasters
1831
1832    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1833        """
1834        Create the ssh keys necessary for interconnecting the portal nodes and
1835        the global hosts file for letting each segment know about the IP
1836        addresses in play.  Save these into the repo.  Add attributes to the
1837        autorizer allowing access controllers to download them and return a set
1838        of attributes that inform the segments where to find this stuff.  May
1839        raise service_errors in if there are problems.
1840        """
1841        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1842        gw_secretkey_base = "fed.%s" % self.ssh_type
1843        keydir = os.path.join(tmpdir, 'keys')
1844        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1845        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1846
1847        try:
1848            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1849        except ValueError:
1850            raise service_error(service_error.server_config, 
1851                    "Bad key type (%s)" % self.ssh_type)
1852
1853        self.generate_seer_certs(keydir)
1854
1855        # Copy configuration files into the remote file store
1856        # The config urlpath
1857        configpath = "/%s/config" % expid
1858        # The config file system location
1859        configdir ="%s%s" % ( self.repodir, configpath)
1860        try:
1861            os.makedirs(configdir)
1862        except EnvironmentError, e:
1863            raise service_error(service_error.internal,
1864                    "Cannot create config directory: %s" % e)
1865        try:
1866            f = open("%s/hosts" % configdir, "w")
1867            print >> f, string.join(hosts, '\n')
1868            f.close()
1869        except EnvironmentError, e:
1870            raise service_error(service_error.internal, 
1871                    "Cannot write hosts file: %s" % e)
1872        try:
1873            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1874            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1875            copy_file(os.path.join(keydir, 'ca.pem'), 
1876                    os.path.join(configdir, 'ca.pem'))
1877            copy_file(os.path.join(keydir, 'node.pem'), 
1878                    os.path.join(configdir, 'node.pem'))
1879        except EnvironmentError, e:
1880            raise service_error(service_error.internal, 
1881                    "Cannot copy keyfiles: %s" % e)
1882
1883        # Allow the individual testbeds to access the configuration files,
1884        # again by setting an attribute for the relevant pathnames on each
1885        # allocation principal.  Yeah, that's a long list comprehension.
1886        self.append_experiment_authorization(expid, set([
1887            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1888                    for tb in tbparams.keys() \
1889                        for f in ("hosts", 'ca.pem', 'node.pem', 
1890                            gw_secretkey_base, gw_pubkey_base)]))
1891
1892        attrs = [ 
1893                {
1894                    'attribute': 'ssh_pubkey', 
1895                    'value': '%s/%s/config/%s' % \
1896                            (self.repo_url, expid, gw_pubkey_base)
1897                },
1898                {
1899                    'attribute': 'ssh_secretkey', 
1900                    'value': '%s/%s/config/%s' % \
1901                            (self.repo_url, expid, gw_secretkey_base)
1902                },
1903                {
1904                    'attribute': 'hosts', 
1905                    'value': '%s/%s/config/hosts' % \
1906                            (self.repo_url, expid)
1907                },
1908                {
1909                    'attribute': 'seer_ca_pem', 
1910                    'value': '%s/%s/config/%s' % \
1911                            (self.repo_url, expid, 'ca.pem')
1912                },
1913                {
1914                    'attribute': 'seer_node_pem', 
1915                    'value': '%s/%s/config/%s' % \
1916                            (self.repo_url, expid, 'node.pem')
1917                },
1918            ]
1919        return attrs
1920
1921
1922    def get_vtopo(self, req, fid):
1923        """
1924        Return the stored virtual topology for this experiment
1925        """
1926        rv = None
1927        state = None
1928        self.log.info("vtopo call started for %s" %  fid)
1929
1930        req = req.get('VtopoRequestBody', None)
1931        if not req:
1932            raise service_error(service_error.req,
1933                    "Bad request format (no VtopoRequestBody)")
1934        exp = req.get('experiment', None)
1935        if exp:
1936            if exp.has_key('fedid'):
1937                key = exp['fedid']
1938                keytype = "fedid"
1939            elif exp.has_key('localname'):
1940                key = exp['localname']
1941                keytype = "localname"
1942            else:
1943                raise service_error(service_error.req, "Unknown lookup type")
1944        else:
1945            raise service_error(service_error.req, "No request?")
1946
1947        try:
1948            proof = self.check_experiment_access(fid, key)
1949        except service_error, e:
1950            self.log.info("vtopo call failed for %s: access denied" %  fid)
1951            raise e
1952
1953        self.state_lock.acquire()
1954        # XXX: this needs to be recalculated
1955        if key in self.state:
1956            if self.state[key].top is not None:
1957                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1958                rv = { 'experiment' : {keytype: key },
1959                        'vtopo': vtopo,
1960                        'proof': proof.to_dict(), 
1961                    }
1962            else:
1963                state = self.state[key].status
1964        self.state_lock.release()
1965
1966        if rv: 
1967            self.log.info("vtopo call completed for %s %s " % \
1968                (key, fid))
1969            return rv
1970        else: 
1971            if state:
1972                self.log.info("vtopo call completed for %s %s (Not ready)" % \
1973                    (key, fid))
1974                raise service_error(service_error.partial, 
1975                        "Not ready: %s" % state)
1976            else:
1977                self.log.info("vtopo call completed for %s %s (No experiment)"\
1978                        % (key, fid))
1979                raise service_error(service_error.req, "No such experiment")
1980
1981    def get_vis(self, req, fid):
1982        """
1983        Return the stored visualization for this experiment
1984        """
1985        rv = None
1986        state = None
1987
1988        self.log.info("vis call started for %s" %  fid)
1989        req = req.get('VisRequestBody', None)
1990        if not req:
1991            raise service_error(service_error.req,
1992                    "Bad request format (no VisRequestBody)")
1993        exp = req.get('experiment', None)
1994        if exp:
1995            if exp.has_key('fedid'):
1996                key = exp['fedid']
1997                keytype = "fedid"
1998            elif exp.has_key('localname'):
1999                key = exp['localname']
2000                keytype = "localname"
2001            else:
2002                raise service_error(service_error.req, "Unknown lookup type")
2003        else:
2004            raise service_error(service_error.req, "No request?")
2005
2006        try:
2007            proof = self.check_experiment_access(fid, key)
2008        except service_error, e:
2009            self.log.info("vis call failed for %s: access denied" %  fid)
2010            raise e
2011
2012        self.state_lock.acquire()
2013        # Generate the visualization
2014        if key in self.state:
2015            if self.state[key].top is not None:
2016                try:
2017                    vis = self.genviz(
2018                            topdl.topology_to_vtopo(self.state[key].top))
2019                except service_error, e:
2020                    self.state_lock.release()
2021                    raise e
2022                rv =  { 'experiment' : {keytype: key },
2023                        'vis': vis,
2024                        'proof': proof.to_dict(), 
2025                        }
2026            else:
2027                state = self.state[key].status
2028        self.state_lock.release()
2029
2030        if rv: 
2031            self.log.info("vis call completed for %s %s " % \
2032                (key, fid))
2033            return rv
2034        else:
2035            if state:
2036                self.log.info("vis call completed for %s %s (not ready)" % \
2037                    (key, fid))
2038                raise service_error(service_error.partial, 
2039                        "Not ready: %s" % state)
2040            else:
2041                self.log.info("vis call completed for %s %s (no experiment)" % \
2042                    (key, fid))
2043                raise service_error(service_error.req, "No such experiment")
2044
2045   
2046    def save_federant_information(self, allocated, tbparams, eid, top):
2047        """
2048        Store the various data that have changed in the experiment state
2049        between when it was started and the beginning of resource allocation.
2050        This is basically the information about each local allocation.  This
2051        fills in the values of the placeholder allocation in the state.  It
2052        also collects the access proofs and returns them as dicts for a
2053        response message.
2054        """
2055        self.state_lock.acquire()
2056        exp = self.state[eid]
2057        exp.top = top.clone()
2058        # save federant information
2059        for k in allocated.keys():
2060            exp.add_allocation(tbparams[k])
2061            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2062                type="testbed", localname=[k], 
2063                service=[ s.to_topdl() for s in tbparams[k].services]))
2064
2065        # Access proofs for the response message
2066        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2067                    for p in tbparams[k].proof]
2068        exp.updated()
2069        if self.state_filename: 
2070            self.write_state()
2071        self.state_lock.release()
2072        return proofs
2073
2074    def clear_placeholder(self, eid, expid, tmpdir):
2075        """
2076        Clear the placeholder and remove any allocated temporary dir.
2077        """
2078
2079        self.state_lock.acquire()
2080        del self.state[eid]
2081        del self.state[expid]
2082        if self.state_filename: self.write_state()
2083        self.state_lock.release()
2084        if tmpdir and self.cleanup:
2085            self.remove_dirs(tmpdir)
2086
2087    # end of create_experiment sub-functions
2088
2089    def create_experiment(self, req, fid):
2090        """
2091        The external interface to experiment creation called from the
2092        dispatcher.
2093
2094        Creates a working directory, splits the incoming description using the
2095        splitter script and parses out the various subsections using the
2096        classes above.  Once each sub-experiment is created, use pooled threads
2097        to instantiate them and start it all up.
2098        """
2099
2100        self.log.info("Create experiment call started for %s" % fid)
2101        req = req.get('CreateRequestBody', None)
2102        if req:
2103            key = self.get_experiment_key(req)
2104        else:
2105            raise service_error(service_error.req,
2106                    "Bad request format (no CreateRequestBody)")
2107
2108        # Import information from the requester
2109        if self.auth.import_credentials(data_list=req.get('credential', [])):
2110            self.auth.save()
2111        else:
2112            self.log.debug("Failed to import delegation credentials(!)")
2113
2114        try:
2115            # Make sure that the caller can talk to us
2116            proof = self.check_experiment_access(fid, key)
2117        except service_error, e:
2118            self.log.info("Create experiment call failed for %s: access denied"\
2119                    % fid)
2120            raise e
2121
2122
2123        # Install the testbed map entries supplied with the request into a copy
2124        # of the testbed map.
2125        tbmap = dict(self.tbmap)
2126        tbactive = set(self.tbactive)
2127        for m in req.get('testbedmap', []):
2128            if 'testbed' in m and 'uri' in m:
2129                tbmap[m['testbed']] = m['uri']
2130                if 'active' in m and m['active']: tbactive.add(m['testbed'])
2131
2132        # a place to work
2133        try:
2134            tmpdir = tempfile.mkdtemp(prefix="split-")
2135            os.mkdir(tmpdir+"/keys")
2136        except EnvironmentError:
2137            raise service_error(service_error.internal, "Cannot create tmp dir")
2138
2139        tbparams = { }
2140
2141        eid, expid, expcert_file = \
2142                self.get_experiment_ids_and_start(key, tmpdir)
2143
2144        # This catches exceptions to clear the placeholder if necessary
2145        try: 
2146            if not (eid and expid):
2147                raise service_error(service_error.internal, 
2148                        "Cannot find local experiment info!?")
2149
2150            top = self.get_topology(req, tmpdir)
2151            self.confirm_software(top)
2152            # Assign the IPs
2153            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2154            # Find the testbeds to look up
2155            tb_hosts = { }
2156            testbeds = [ ]
2157            for e in top.elements:
2158                if isinstance(e, topdl.Computer):
2159                    tb = e.get_attribute('testbed') or 'default'
2160                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2161                    else: 
2162                        tb_hosts[tb] = [ e.name ]
2163                        testbeds.append(tb)
2164
2165            masters, pmasters = self.get_testbed_services(req, testbeds)
2166            allocated = { }         # Testbeds we can access
2167            topo ={ }               # Sub topologies
2168            connInfo = { }          # Connection information
2169
2170            self.split_topology(top, topo, testbeds)
2171
2172            self.get_access_to_testbeds(testbeds, fid, allocated, 
2173                    tbparams, masters, tbmap, expid, expcert_file)
2174
2175            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2176
2177            part = experiment_partition(self.auth, self.store_url, tbmap,
2178                    self.muxmax, self.direct_transit, tbactive)
2179            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2180                    connInfo, expid)
2181
2182            auth_attrs = set()
2183            # Now get access to the dynamic testbeds (those added above)
2184            for tb in [ t for t in topo if t not in allocated]:
2185                self.get_access(tb, tbparams, fid, masters, tbmap, 
2186                        expid, expcert_file)
2187                allocated[tb] = 1
2188                store_keys = topo[tb].get_attribute('store_keys')
2189                # Give the testbed access to keys it exports or imports
2190                if store_keys:
2191                    auth_attrs.update(set([
2192                        (tbparams[tb].allocID, sk) \
2193                                for sk in store_keys.split(" ")]))
2194
2195            if auth_attrs:
2196                self.append_experiment_authorization(expid, auth_attrs)
2197
2198            # transit and disconnected testbeds may not have a connInfo entry.
2199            # Fill in the blanks.
2200            for t in allocated.keys():
2201                if not connInfo.has_key(t):
2202                    connInfo[t] = { }
2203
2204            self.wrangle_software(expid, top, topo, tbparams)
2205
2206            proofs = self.save_federant_information(allocated, tbparams, 
2207                    eid, top)
2208        except service_error, e:
2209            # If something goes wrong in the parse (usually an access error)
2210            # clear the placeholder state.  From here on out the code delays
2211            # exceptions.  Failing at this point returns a fault to the remote
2212            # caller.
2213
2214            self.log.info("Create experiment call failed for %s %s: %s" % 
2215                    (eid, fid, e))
2216            self.clear_placeholder(eid, expid, tmpdir)
2217            raise e
2218
2219        # Start the background swapper and return the starting state.  From
2220        # here on out, the state will stick around a while.
2221
2222        # Create a logger that logs to the experiment's state object as well as
2223        # to the main log file.
2224        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2225        alloc_collector = self.list_log(self.state[eid].log)
2226        h = logging.StreamHandler(alloc_collector)
2227        # XXX: there should be a global one of these rather than repeating the
2228        # code.
2229        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2230                    '%d %b %y %H:%M:%S'))
2231        alloc_log.addHandler(h)
2232
2233        # Start a thread to do the resource allocation
2234        t  = Thread(target=self.allocate_resources,
2235                args=(allocated, masters, eid, expid, tbparams, 
2236                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2237                    connInfo, tbmap, expcert_file),
2238                name=eid)
2239        t.start()
2240
2241        rv = {
2242                'experimentID': [
2243                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2244                ],
2245                'experimentStatus': 'starting',
2246                'proof': [ proof.to_dict() ] + proofs,
2247            }
2248        self.log.info("Create experiment call succeeded for %s %s" % \
2249                (eid, fid))
2250
2251        return rv
2252   
2253    def get_experiment_fedid(self, key):
2254        """
2255        find the fedid associated with the localname key in the state database.
2256        """
2257
2258        rv = None
2259        self.state_lock.acquire()
2260        if key in self.state:
2261            rv = self.state[key].fedid
2262        self.state_lock.release()
2263        return rv
2264
2265    def check_experiment_access(self, fid, key):
2266        """
2267        Confirm that the fid has access to the experiment.  Though a request
2268        may be made in terms of a local name, the access attribute is always
2269        the experiment's fedid.
2270        """
2271        if not isinstance(key, fedid):
2272            key = self.get_experiment_fedid(key)
2273
2274        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2275
2276        if access_ok:
2277            return proof
2278        else:
2279            raise service_error(service_error.access, "Access Denied",
2280                proof)
2281
2282
2283    def get_handler(self, path, fid):
2284        """
2285        Perhaps surprisingly named, this function handles HTTP GET requests to
2286        this server (SOAP requests are POSTs).
2287        """
2288        self.log.info("Get handler %s %s" % (path, fid))
2289        # XXX: log proofs?
2290        if self.auth.check_attribute(fid, path):
2291            return ("%s/%s" % (self.repodir, path), "application/binary")
2292        else:
2293            return (None, None)
2294
2295    def update_info(self, key, force=False):
2296        top = None
2297        self.state_lock.acquire()
2298        if key in self.state:
2299            if force or self.state[key].older_than(self.info_cache_limit):
2300                top = self.state[key].top
2301                if top is not None: top = top.clone()
2302                d1, info_params, cert, d2 = \
2303                        self.get_segment_info(self.state[key], need_lock=False)
2304        self.state_lock.release()
2305
2306        if top is None: return
2307
2308        try:
2309            tmpdir = tempfile.mkdtemp(prefix="info-")
2310        except EnvironmentError:
2311            raise service_error(service_error.internal, 
2312                    "Cannot create tmp dir")
2313        cert_file = self.make_temp_certfile(cert, tmpdir)
2314
2315        data = []
2316        try:
2317            for k, (uri, aid) in info_params.items():
2318                info=self.info_segment(log=self.log, testbed=uri,
2319                            cert_file=cert_file, cert_pwd=None,
2320                            trusted_certs=self.trusted_certs,
2321                            caller=self.call_InfoSegment)
2322                info(uri, aid)
2323                data.append(info)
2324        # Clean up the tmpdir no matter what
2325        finally:
2326            if tmpdir: self.remove_dirs(tmpdir)
2327
2328        self.annotate_topology(top, data)
2329        self.state_lock.acquire()
2330        if key in self.state:
2331            self.state[key].top = top
2332            self.state[key].updated()
2333            if self.state_filename: self.write_state()
2334        self.state_lock.release()
2335
2336   
2337    def get_info(self, req, fid):
2338        """
2339        Return all the stored info about this experiment
2340        """
2341        rv = None
2342
2343        self.log.info("Info call started for %s" %  fid)
2344        req = req.get('InfoRequestBody', None)
2345        if not req:
2346            raise service_error(service_error.req,
2347                    "Bad request format (no InfoRequestBody)")
2348        exp = req.get('experiment', None)
2349        legacy = req.get('legacy', False)
2350        fresh = req.get('fresh', False)
2351        if exp:
2352            if exp.has_key('fedid'):
2353                key = exp['fedid']
2354                keytype = "fedid"
2355            elif exp.has_key('localname'):
2356                key = exp['localname']
2357                keytype = "localname"
2358            else:
2359                raise service_error(service_error.req, "Unknown lookup type")
2360        else:
2361            raise service_error(service_error.req, "No request?")
2362
2363        try:
2364            proof = self.check_experiment_access(fid, key)
2365        except service_error, e:
2366            self.log.info("Info call failed for %s: access denied" %  fid)
2367
2368
2369        self.update_info(key, fresh)
2370
2371        self.state_lock.acquire()
2372        if self.state.has_key(key):
2373            rv = self.state[key].get_info()
2374            # Copy the topo if we need legacy annotations
2375            if legacy:
2376                top = self.state[key].top
2377                if top is not None: top = top.clone()
2378        self.state_lock.release()
2379        self.log.info("Gathered Info for %s %s" % (key, fid))
2380
2381        # If the legacy visualization and topology representations are
2382        # requested, calculate them and add them to the return.
2383        if legacy and rv is not None:
2384            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2385            if top is not None:
2386                vtopo = topdl.topology_to_vtopo(top)
2387                if vtopo is not None:
2388                    rv['vtopo'] = vtopo
2389                    try:
2390                        vis = self.genviz(vtopo)
2391                    except service_error, e:
2392                        self.log.debug('Problem generating visualization: %s' \
2393                                % e)
2394                        vis = None
2395                    if vis is not None:
2396                        rv['vis'] = vis
2397        if rv:
2398            self.log.info("Info succeded for %s %s" % (key, fid))
2399            rv['proof'] = proof.to_dict()
2400            return rv
2401        else: 
2402            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2403            raise service_error(service_error.req, "No such experiment")
2404
2405    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2406            results):
2407        """
2408        Call OperateSegment on multiple testbeds and gather the results.
2409        op_params contains the parameters needed to contact that testbed, cert
2410        is a certificate containing the fedid to use, op is the operation,
2411        testbeds is a dict mapping testbed name to targets in that testbed,
2412        params are the parameters to include a,d results is a growing list of
2413        the results of the calls.
2414        """
2415        try:
2416            tmpdir = tempfile.mkdtemp(prefix="info-")
2417        except EnvironmentError:
2418            raise service_error(service_error.internal, 
2419                    "Cannot create tmp dir")
2420        cert_file = self.make_temp_certfile(cert, tmpdir)
2421
2422        try:
2423            for tb, targets in testbeds.items():
2424                if tb in op_params:
2425                    uri, aid = op_params[tb]
2426                    operate=self.operation_segment(log=self.log, testbed=uri,
2427                                cert_file=cert_file, cert_pwd=None,
2428                                trusted_certs=self.trusted_certs,
2429                                caller=self.call_OperationSegment)
2430                    if operate(uri, aid, op, targets, params):
2431                        if operate.status is not None:
2432                            results.extend(operate.status)
2433                            continue
2434                # Something went wrong in a weird way.  Add statuses
2435                # that reflect that to results
2436                for t in targets:
2437                    results.append(operation_status(t, 
2438                        operation_status.federant,
2439                        'Unexpected error on %s' % tb))
2440        # Clean up the tmpdir no matter what
2441        finally:
2442            if tmpdir: self.remove_dirs(tmpdir)
2443
2444    def do_operation(self, req, fid):
2445        """
2446        Find the testbeds holding each target and ask them to carry out the
2447        operation.  Return the statuses.
2448        """
2449        # Map an element to the testbed containing it
2450        def element_to_tb(e):
2451            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2452            elif isinstance(e, topdl.Testbed): return e.name
2453            else: return None
2454        # If d is an operation_status object, make it a dict
2455        def make_dict(d):
2456            if isinstance(d, dict): return d
2457            elif isinstance(d, operation_status): return d.to_dict()
2458            else: return { }
2459
2460        def element_name(e):
2461            if isinstance(e, topdl.Computer): return e.name
2462            elif isinstance(e, topdl.Testbed): 
2463                if e.localname: return e.localname[0]
2464                else: return None
2465            else: return None
2466
2467        self.log.info("Operation call started for %s" %  fid)
2468        req = req.get('OperationRequestBody', None)
2469        if not req:
2470            raise service_error(service_error.req,
2471                    "Bad request format (no OperationRequestBody)")
2472        exp = req.get('experiment', None)
2473        op = req.get('operation', None)
2474        targets = set(req.get('target', []))
2475        params = req.get('parameter', None)
2476
2477        if exp:
2478            if 'fedid' in exp:
2479                key = exp['fedid']
2480                keytype = "fedid"
2481            elif 'localname' in exp:
2482                key = exp['localname']
2483                keytype = "localname"
2484            else:
2485                raise service_error(service_error.req, "Unknown lookup type")
2486        else:
2487            raise service_error(service_error.req, "No request?")
2488
2489        if op is None or not targets:
2490            raise service_error(service_error.req, "No request?")
2491
2492        try:
2493            proof = self.check_experiment_access(fid, key)
2494        except service_error, e:
2495            self.log.info("Operation call failed for %s: access denied" %  fid)
2496            raise e
2497
2498        self.state_lock.acquire()
2499        if key in self.state:
2500            d1, op_params, cert, d2 = \
2501                    self.get_segment_info(self.state[key], need_lock=False,
2502                            key='tb')
2503            top = self.state[key].top
2504            if top is not None:
2505                top = top.clone()
2506        self.state_lock.release()
2507
2508        if top is None:
2509            self.log.info("Operation call failed for %s: not active" %  fid)
2510            raise service_error(service_error.partial, "No topology yet", 
2511                    proof=proof)
2512
2513        testbeds = { }
2514        results = []
2515        for e in top.elements:
2516            ename = element_name(e)
2517            if ename in targets:
2518                tb = element_to_tb(e)
2519                targets.remove(ename)
2520                if tb is not None:
2521                    if tb in testbeds: testbeds[tb].append(ename)
2522                    else: testbeds[tb] = [ ename ]
2523                else:
2524                    results.append(operation_status(e.name, 
2525                        code=operation_status.no_target, 
2526                        description='Cannot map target to testbed'))
2527
2528        for t in targets:
2529            results.append(operation_status(t, operation_status.no_target))
2530
2531        self.operate_on_segments(op_params, cert, op, testbeds, params,
2532                results)
2533
2534        self.log.info("Operation call succeeded for %s" %  fid)
2535        return { 
2536                'experiment': exp, 
2537                'status': [make_dict(r) for r in results],
2538                'proof': proof.to_dict()
2539                }
2540
2541
2542    def get_multi_info(self, req, fid):
2543        """
2544        Return all the stored info that this fedid can access
2545        """
2546        rv = { 'info': [ ], 'proof': [ ] }
2547
2548        self.log.info("Multi Info call started for %s" %  fid)
2549        self.state_lock.acquire()
2550        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2551            try:
2552                proof = self.check_experiment_access(fid, key)
2553            except service_error, e:
2554                if e.code == service_error.access:
2555                    continue
2556                else:
2557                    self.log.info("Multi Info call failed for %s: %s" %  \
2558                            (e,fid))
2559                    self.state_lock.release()
2560                    raise e
2561
2562            if self.state.has_key(key):
2563                e = self.state[key].get_info()
2564                e['proof'] = proof.to_dict()
2565                rv['info'].append(e)
2566                rv['proof'].append(proof.to_dict())
2567        self.state_lock.release()
2568        self.log.info("Multi Info call succeeded for %s" %  fid)
2569        return rv
2570
2571    def check_termination_status(self, fed_exp, force):
2572        """
2573        Confirm that the experiment is sin a valid state to stop (or force it)
2574        return the state - invalid states for deletion and force settings cause
2575        exceptions.
2576        """
2577        self.state_lock.acquire()
2578        status = fed_exp.status
2579
2580        if status:
2581            if status in ('starting', 'terminating'):
2582                if not force:
2583                    self.state_lock.release()
2584                    raise service_error(service_error.partial, 
2585                            'Experiment still being created or destroyed')
2586                else:
2587                    self.log.warning('Experiment in %s state ' % status + \
2588                            'being terminated by force.')
2589            self.state_lock.release()
2590            return status
2591        else:
2592            # No status??? trouble
2593            self.state_lock.release()
2594            raise service_error(service_error.internal,
2595                    "Experiment has no status!?")
2596
2597    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2598        ids = []
2599        term_params = { }
2600        if need_lock: self.state_lock.acquire()
2601        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2602        expcert = fed_exp.identity
2603        repo = "%s" % fed_exp.fedid
2604
2605        # Collect the allocation/segment ids into a dict keyed by the fedid
2606        # of the allocation that contains a tuple of uri, aid
2607        for i, fed in enumerate(fed_exp.get_all_allocations()):
2608            uri = fed.uri
2609            aid = fed.allocID
2610            if key == 'aid': term_params[aid] = (uri, aid)
2611            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2612
2613        if need_lock: self.state_lock.release()
2614        return ids, term_params, expcert, repo
2615
2616
2617    def get_termination_info(self, fed_exp):
2618        self.state_lock.acquire()
2619        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2620        # Change the experiment state
2621        fed_exp.status = 'terminating'
2622        fed_exp.updated()
2623        if self.state_filename: self.write_state()
2624        self.state_lock.release()
2625
2626        return ids, term_params, expcert, repo
2627
2628
2629    def deallocate_resources(self, term_params, expcert, status, force, 
2630            dealloc_log):
2631        tmpdir = None
2632        # This try block makes sure the tempdir is cleared
2633        try:
2634            # If no expcert, try the deallocation as the experiment
2635            # controller instance.
2636            if expcert and self.auth_type != 'legacy': 
2637                try:
2638                    tmpdir = tempfile.mkdtemp(prefix="term-")
2639                except EnvironmentError:
2640                    raise service_error(service_error.internal, 
2641                            "Cannot create tmp dir")
2642                cert_file = self.make_temp_certfile(expcert, tmpdir)
2643                pw = None
2644            else: 
2645                cert_file = self.cert_file
2646                pw = self.cert_pwd
2647
2648            # Stop everyone.  NB, wait_for_all waits until a thread starts
2649            # and then completes, so we can't wait if nothing starts.  So,
2650            # no tbparams, no start.
2651            if len(term_params) > 0:
2652                tp = thread_pool(self.nthreads)
2653                for k, (uri, aid) in term_params.items():
2654                    # Create and start a thread to stop the segment
2655                    tp.wait_for_slot()
2656                    t  = pooled_thread(\
2657                            target=self.terminate_segment(log=dealloc_log,
2658                                testbed=uri,
2659                                cert_file=cert_file, 
2660                                cert_pwd=pw,
2661                                trusted_certs=self.trusted_certs,
2662                                caller=self.call_TerminateSegment),
2663                            args=(uri, aid), name=k,
2664                            pdata=tp, trace_file=self.trace_file)
2665                    t.start()
2666                # Wait for completions
2667                tp.wait_for_all_done()
2668
2669            # release the allocations (failed experiments have done this
2670            # already, and starting experiments may be in odd states, so we
2671            # ignore errors releasing those allocations
2672            try: 
2673                for k, (uri, aid)  in term_params.items():
2674                    self.release_access(None, aid, uri=uri,
2675                            cert_file=cert_file, cert_pwd=pw)
2676            except service_error, e:
2677                if status != 'failed' and not force:
2678                    raise e
2679
2680        # Clean up the tmpdir no matter what
2681        finally:
2682            if tmpdir: self.remove_dirs(tmpdir)
2683
2684    def terminate_experiment(self, req, fid):
2685        """
2686        Swap this experiment out on the federants and delete the shared
2687        information
2688        """
2689        self.log.info("Terminate experiment call started for %s" % fid)
2690        tbparams = { }
2691        req = req.get('TerminateRequestBody', None)
2692        if not req:
2693            raise service_error(service_error.req,
2694                    "Bad request format (no TerminateRequestBody)")
2695
2696        key = self.get_experiment_key(req, 'experiment')
2697        try:
2698            proof = self.check_experiment_access(fid, key)
2699        except service_error, e:
2700            self.log.info(
2701                    "Terminate experiment call failed for %s: access denied" \
2702                            % fid)
2703            raise e
2704        exp = req.get('experiment', False)
2705        force = req.get('force', False)
2706
2707        dealloc_list = [ ]
2708
2709
2710        # Create a logger that logs to the dealloc_list as well as to the main
2711        # log file.
2712        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2713        dealloc_log.info("Terminating %s " %key)
2714        h = logging.StreamHandler(self.list_log(dealloc_list))
2715        # XXX: there should be a global one of these rather than repeating the
2716        # code.
2717        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2718                    '%d %b %y %H:%M:%S'))
2719        dealloc_log.addHandler(h)
2720
2721        self.state_lock.acquire()
2722        fed_exp = self.state.get(key, None)
2723        self.state_lock.release()
2724        repo = None
2725
2726        if fed_exp:
2727            status = self.check_termination_status(fed_exp, force)
2728            # get_termination_info updates the experiment state
2729            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2730            self.deallocate_resources(term_params, expcert, status, force, 
2731                    dealloc_log)
2732
2733            # Remove the terminated experiment
2734            self.state_lock.acquire()
2735            for id in ids:
2736                self.clear_experiment_authorization(id, need_state_lock=False)
2737                if id in self.state: del self.state[id]
2738
2739            if self.state_filename: self.write_state()
2740            self.state_lock.release()
2741
2742            # Delete any synch points associated with this experiment.  All
2743            # synch points begin with the fedid of the experiment.
2744            fedid_keys = set(["fedid:%s" % f for f in ids \
2745                    if isinstance(f, fedid)])
2746            for k in self.synch_store.all_keys():
2747                try:
2748                    if len(k) > 45 and k[0:46] in fedid_keys:
2749                        self.synch_store.del_value(k)
2750                except synch_store.BadDeletionError:
2751                    pass
2752            self.write_store()
2753
2754            # Remove software and other cached stuff from the filesystem.
2755            if repo:
2756                self.remove_dirs("%s/%s" % (self.repodir, repo))
2757       
2758            self.log.info("Terminate experiment succeeded for %s %s" % \
2759                    (key, fid))
2760            return { 
2761                    'experiment': exp , 
2762                    'deallocationLog': string.join(dealloc_list, ''),
2763                    'proof': [proof.to_dict()],
2764                    }
2765        else:
2766            self.log.info("Terminate experiment failed for %s %s: no state" % \
2767                    (key, fid))
2768            raise service_error(service_error.req, "No saved state")
2769
2770
2771    def GetValue(self, req, fid):
2772        """
2773        Get a value from the synchronized store
2774        """
2775        req = req.get('GetValueRequestBody', None)
2776        if not req:
2777            raise service_error(service_error.req,
2778                    "Bad request format (no GetValueRequestBody)")
2779       
2780        name = req.get('name', None)
2781        wait = req.get('wait', False)
2782        rv = { 'name': name }
2783
2784        if not name:
2785            raise service_error(service_error.req, "No name?")
2786
2787        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2788
2789        if access_ok:
2790            self.log.debug("[GetValue] asking for %s " % name)
2791            try:
2792                v = self.synch_store.get_value(name, wait)
2793            except synch_store.RevokedKeyError:
2794                # No more synch on this key
2795                raise service_error(service_error.federant, 
2796                        "Synch key %s revoked" % name)
2797            if v is not None:
2798                rv['value'] = v
2799            rv['proof'] = proof.to_dict()
2800            self.log.debug("[GetValue] got %s from %s" % (v, name))
2801            return rv
2802        else:
2803            raise service_error(service_error.access, "Access Denied",
2804                    proof=proof)
2805       
2806
2807    def SetValue(self, req, fid):
2808        """
2809        Set a value in the synchronized store
2810        """
2811        req = req.get('SetValueRequestBody', None)
2812        if not req:
2813            raise service_error(service_error.req,
2814                    "Bad request format (no SetValueRequestBody)")
2815       
2816        name = req.get('name', None)
2817        v = req.get('value', '')
2818
2819        if not name:
2820            raise service_error(service_error.req, "No name?")
2821
2822        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2823
2824        if access_ok:
2825            try:
2826                self.synch_store.set_value(name, v)
2827                self.write_store()
2828                self.log.debug("[SetValue] set %s to %s" % (name, v))
2829            except synch_store.CollisionError:
2830                # Translate into a service_error
2831                raise service_error(service_error.req,
2832                        "Value already set: %s" %name)
2833            except synch_store.RevokedKeyError:
2834                # No more synch on this key
2835                raise service_error(service_error.federant, 
2836                        "Synch key %s revoked" % name)
2837                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2838        else:
2839            raise service_error(service_error.access, "Access Denied",
2840                    proof=proof)
Note: See TracBrowser for help on using the repository browser.