source: fedd/federation/experiment_control.py @ cc6091c

compt_changes
Last change on this file since cc6091c was cde9b98, checked in by Ted Faber <faber@…>, 13 years ago

More debug logging

  • Property mode set to 100644
File size: 92.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from deter import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39from operation_status import operation_status
40
41from deter import topdl
42from deter import ip_allocator
43from deter import ip_addr
44import list_log
45
46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
53class experiment_control_local(experiment_control_legacy):
54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
60
61    class ssh_cmd_timeout(RuntimeError): pass
62   
63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
65    call_StartSegment = service_caller('StartSegment')
66    call_TerminateSegment = service_caller('TerminateSegment')
67    call_InfoSegment = service_caller('InfoSegment')
68    call_OperationSegment = service_caller('OperationSegment')
69    call_Ns2Topdl = service_caller('Ns2Topdl')
70
71    def __init__(self, config=None, auth=None):
72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
90        self.list_log = list_log.list_log
91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
102        self.repodir = config.get("experiment_control", "repodir")
103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
105
106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
110        self.nthreads = 10
111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131
132        dt = config.get("experiment_control", "direct_transit")
133        self.auth_type = config.get('experiment_control', 'auth_type') \
134                or 'legacy'
135        self.auth_dir = config.get('experiment_control', 'auth_dir')
136        # XXX: document this!
137        self.info_cache_limit = \
138                config.getint('experiment_control', 'info_cache', 600)
139        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
140        else: self.direct_transit = [ ]
141        # NB for internal master/slave ops, not experiment setup
142        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
143
144        self.overrides = set([])
145        ovr = config.get('experiment_control', 'overrides')
146        if ovr:
147            for o in ovr.split(","):
148                o = o.strip()
149                if o.startswith('fedid:'): o = o[len('fedid:'):]
150                self.overrides.add(fedid(hexstr=o))
151
152        self.state = { }
153        self.state_lock = Lock()
154        self.tclsh = "/usr/local/bin/otclsh"
155        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
156                config.get("experiment_control", "tcl_splitter",
157                        "/usr/testbed/lib/ns2ir/parse.tcl")
158        mapdb_file = config.get("experiment_control", "mapdb")
159        self.trace_file = sys.stderr
160
161        self.def_expstart = \
162                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
163                "/tmp/federate";
164        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
165                "FEDDIR/hosts";
166        self.def_gwstart = \
167                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
168                "/tmp/bridge.log";
169        self.def_mgwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
171                "/tmp/bridge.log";
172        self.def_gwimage = "FBSD61-TUNNEL2";
173        self.def_gwtype = "pc";
174        self.local_access = { }
175
176        if self.auth_type == 'legacy':
177            if auth:
178                self.auth = auth
179            else:
180                self.log.error( "[access]: No authorizer initialized, " +\
181                        "creating local one.")
182                auth = authorizer()
183            self.get_access = self.legacy_get_access
184        elif self.auth_type == 'abac':
185            self.auth = abac_authorizer(load=self.auth_dir)
186        else:
187            raise service_error(service_error.internal, 
188                    "Unknown auth_type: %s" % self.auth_type)
189
190        if mapdb_file:
191            self.read_mapdb(mapdb_file)
192        else:
193            self.log.warn("[experiment_control] No testbed map, using defaults")
194            self.tbmap = { 
195                    'deter':'https://users.isi.deterlab.net:23235',
196                    'emulab':'https://users.isi.deterlab.net:23236',
197                    'ucb':'https://users.isi.deterlab.net:23237',
198                    }
199
200        # Grab saved state.  OK to do this w/o locking because it's read only
201        # and only one thread should be in existence that can see self.state at
202        # this point.
203        if self.state_filename:
204            self.read_state()
205
206        if self.store_filename:
207            self.read_store()
208        else:
209            self.log.warning("No saved synch store")
210            self.synch_store = synch_store
211
212        # Dispatch tables
213        self.soap_services = {\
214                'New': soap_handler('New', self.new_experiment),
215                'Create': soap_handler('Create', self.create_experiment),
216                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
217                'Vis': soap_handler('Vis', self.get_vis),
218                'Info': soap_handler('Info', self.get_info),
219                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
220                'Operation': soap_handler('Operation', self.do_operation),
221                'Terminate': soap_handler('Terminate', 
222                    self.terminate_experiment),
223                'GetValue': soap_handler('GetValue', self.GetValue),
224                'SetValue': soap_handler('SetValue', self.SetValue),
225        }
226
227        self.xmlrpc_services = {\
228                'New': xmlrpc_handler('New', self.new_experiment),
229                'Create': xmlrpc_handler('Create', self.create_experiment),
230                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
231                'Vis': xmlrpc_handler('Vis', self.get_vis),
232                'Info': xmlrpc_handler('Info', self.get_info),
233                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
234                'Terminate': xmlrpc_handler('Terminate',
235                    self.terminate_experiment),
236                'Operation': xmlrpc_handler('Operation', self.do_operation),
237                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
238                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
239        }
240
241    # Call while holding self.state_lock
242    def write_state(self):
243        """
244        Write a new copy of experiment state after copying the existing state
245        to a backup.
246
247        State format is a simple pickling of the state dictionary.
248        """
249        if os.access(self.state_filename, os.W_OK):
250            copy_file(self.state_filename, \
251                    "%s.bak" % self.state_filename)
252        try:
253            f = open(self.state_filename, 'w')
254            pickle.dump(self.state, f)
255        except EnvironmentError, e:
256            self.log.error("Can't write file %s: %s" % \
257                    (self.state_filename, e))
258        except pickle.PicklingError, e:
259            self.log.error("Pickling problem: %s" % e)
260        except TypeError, e:
261            self.log.error("Pickling problem (TypeError): %s" % e)
262
263    @staticmethod
264    def get_alloc_ids(exp):
265        """
266        Used by read_store and read state.  This used to be worse.
267        """
268
269        return [ a.allocID for a in exp.get_all_allocations() ]
270
271
272    # Call while holding self.state_lock
273    def read_state(self):
274        """
275        Read a new copy of experiment state.  Old state is overwritten.
276
277        State format is a simple pickling of the state dictionary.
278        """
279       
280        try:
281            f = open(self.state_filename, "r")
282            self.state = pickle.load(f)
283            self.log.debug("[read_state]: Read state from %s" % \
284                    self.state_filename)
285        except EnvironmentError, e:
286            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
287                    % (self.state_filename, e))
288        except pickle.UnpicklingError, e:
289            self.log.warning(("[read_state]: No saved state: " + \
290                    "Unpickling failed: %s") % e)
291       
292        for s in self.state.values():
293            try:
294
295                eid = s.fedid
296                if eid : 
297                    if self.auth_type == 'legacy':
298                        # XXX: legacy
299                        # Give the owner rights to the experiment
300                        #self.auth.set_attribute(s['owner'], eid)
301                        # And holders of the eid as well
302                        self.auth.set_attribute(eid, eid)
303                        # allow overrides to control experiments as well
304                        for o in self.overrides:
305                            self.auth.set_attribute(o, eid)
306                        # Set permissions to allow reading of the software
307                        # repo, if any, as well.
308                        for a in self.get_alloc_ids(s):
309                            self.auth.set_attribute(a, 'repo/%s' % eid)
310                else:
311                    raise KeyError("No experiment id")
312            except KeyError, e:
313                self.log.warning("[read_state]: State ownership or identity " +\
314                        "misformatted in %s: %s" % (self.state_filename, e))
315
316    def read_mapdb(self, file):
317        """
318        Read a simple colon separated list of mappings for the
319        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
320        also adds testbeds to active if they include , active after
321        their name.
322        """
323
324        self.tbmap = { }
325        self.tbactive = set()
326        lineno =0
327        try:
328            f = open(file, "r")
329            for line in f:
330                lineno += 1
331                line = line.strip()
332                if line.startswith('#') or len(line) == 0:
333                    continue
334                try:
335                    label, url = line.split(':', 1)
336                    if ',' in label:
337                        label, act = label.split(',', 1)
338                        active = (act.strip() == 'active')
339                    else:
340                        active = False
341                    self.tbmap[label] = url
342                    if active: self.tbactive.add(label)
343                except ValueError, e:
344                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
345                            "map db: %s %s" % (lineno, line, e))
346        except EnvironmentError, e:
347            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
348                    "open %s: %s" % (file, e))
349        else:
350            f.close()
351
352    def read_store(self):
353        try:
354            self.synch_store = synch_store()
355            self.synch_store.load(self.store_filename)
356            self.log.debug("[read_store]: Read store from %s" % \
357                    self.store_filename)
358        except EnvironmentError, e:
359            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
360                    % (self.state_filename, e))
361            self.synch_store = synch_store()
362
363        # Set the initial permissions on data in the store.  XXX: This ad hoc
364        # authorization attribute initialization is getting out of hand.
365        # XXX: legacy
366        if self.auth_type == 'legacy':
367            for k in self.synch_store.all_keys():
368                try:
369                    if k.startswith('fedid:'):
370                        fid = fedid(hexstr=k[6:46])
371                        if self.state.has_key(fid):
372                            for a in self.get_alloc_ids(self.state[fid]):
373                                self.auth.set_attribute(a, k)
374                except ValueError, e:
375                    self.log.warn("Cannot deduce permissions for %s" % k)
376
377
378    def write_store(self):
379        """
380        Write a new copy of synch_store after writing current state
381        to a backup.  We use the internal synch_store pickle method to avoid
382        incinsistent data.
383
384        State format is a simple pickling of the store.
385        """
386        if os.access(self.store_filename, os.W_OK):
387            copy_file(self.store_filename, \
388                    "%s.bak" % self.store_filename)
389        try:
390            self.synch_store.save(self.store_filename)
391        except EnvironmentError, e:
392            self.log.error("Can't write file %s: %s" % \
393                    (self.store_filename, e))
394        except TypeError, e:
395            self.log.error("Pickling problem (TypeError): %s" % e)
396
397
398    def remove_dirs(self, dir):
399        """
400        Remove the directory tree and all files rooted at dir.  Log any errors,
401        but continue.
402        """
403        self.log.debug("[removedirs]: removing %s" % dir)
404        try:
405            for path, dirs, files in os.walk(dir, topdown=False):
406                for f in files:
407                    os.remove(os.path.join(path, f))
408                for d in dirs:
409                    os.rmdir(os.path.join(path, d))
410            os.rmdir(dir)
411        except EnvironmentError, e:
412            self.log.error("Error deleting directory tree in %s" % e);
413
414    @staticmethod
415    def make_temp_certfile(expcert, tmpdir):
416        """
417        make a protected copy of the access certificate so the experiment
418        controller can act as the experiment principal.  mkstemp is the most
419        secure way to do that. The directory should be created by
420        mkdtemp.  Return the filename.
421        """
422        if expcert and tmpdir:
423            try:
424                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
425                f = os.fdopen(certf, 'w')
426                print >> f, expcert
427                f.close()
428            except EnvironmentError, e:
429                raise service_error(service_error.internal, 
430                        "Cannot create temp cert file?")
431            return certfn
432        else:
433            return None
434
435       
436    def generate_ssh_keys(self, dest, type="rsa" ):
437        """
438        Generate a set of keys for the gateways to use to talk.
439
440        Keys are of type type and are stored in the required dest file.
441        """
442        valid_types = ("rsa", "dsa")
443        t = type.lower();
444        if t not in valid_types: raise ValueError
445        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
446
447        try:
448            trace = open("/dev/null", "w")
449        except EnvironmentError:
450            raise service_error(service_error.internal,
451                    "Cannot open /dev/null??");
452
453        # May raise CalledProcessError
454        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
455        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
456        if rv != 0:
457            raise service_error(service_error.internal, 
458                    "Cannot generate nonce ssh keys.  %s return code %d" \
459                            % (self.ssh_keygen, rv))
460
461    def generate_seer_certs(self, destdir):
462        '''
463        Create a SEER ca cert and a node cert in destdir/ca.pem and
464        destdir/node.pem respectively.  These will be distributed throughout
465        the federated experiment.  This routine reports errors via
466        service_errors.
467        '''
468        openssl = '/usr/bin/openssl'
469        # All the filenames and parameters we need for openssl calls below
470        ca_key =os.path.join(destdir, 'ca.key') 
471        ca_pem = os.path.join(destdir, 'ca.pem')
472        node_key =os.path.join(destdir, 'node.key') 
473        node_pem = os.path.join(destdir, 'node.pem')
474        node_req = os.path.join(destdir, 'node.req')
475        node_signed = os.path.join(destdir, 'node.signed')
476        days = '%s' % (365 * 10)
477        serial = '%s' % random.randint(0, 1<<16)
478
479        try:
480            # Sequence of calls to create a CA key, create a ca cert, create a
481            # node key, node signing request, and finally a signed node
482            # certificate.
483            sequence = (
484                    (openssl, 'genrsa', '-out', ca_key, '1024'),
485                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
486                        ca_pem, '-days', days, '-subj', 
487                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
488                    (openssl, 'genrsa', '-out', node_key, '1024'),
489                    (openssl, 'req', '-new', '-key', node_key, '-out', 
490                        node_req, '-days', days, '-subj', 
491                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
492                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
493                        '-set_serial', serial, '-req', '-in', node_req, 
494                        '-out', node_signed, '-days', days),
495                )
496            # Do all that stuff; bail if there's an error, and push all the
497            # output to dev/null.
498            for cmd in sequence:
499                trace = open("/dev/null", "w")
500                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
501                if rv != 0:
502                    raise service_error(service_error.internal, 
503                            "Cannot generate SEER certs.  %s return code %d" \
504                                    % (' '.join(cmd), rv))
505            # Concatinate the node key and signed certificate into node.pem
506            f = open(node_pem, 'w')
507            for comp in (node_signed, node_key):
508                g = open(comp, 'r')
509                f.write(g.read())
510                g.close()
511            f.close()
512
513            # Throw out intermediaries.
514            for fn in (ca_key, node_key, node_req, node_signed):
515                os.unlink(fn)
516
517        except EnvironmentError, e:
518            # Any difficulties with the file system wind up here
519            raise service_error(service_error.internal,
520                    "File error on  %s while creating SEER certs: %s" % \
521                            (e.filename, e.strerror))
522
523
524
525    def gentopo(self, str):
526        """
527        Generate the topology data structure from the splitter's XML
528        representation of it.
529
530        The topology XML looks like:
531            <experiment>
532                <nodes>
533                    <node><vname></vname><ips>ip1:ip2</ips></node>
534                </nodes>
535                <lans>
536                    <lan>
537                        <vname></vname><vnode></vnode><ip></ip>
538                        <bandwidth></bandwidth><member>node:port</member>
539                    </lan>
540                </lans>
541        """
542        class topo_parse:
543            """
544            Parse the topology XML and create the dats structure.
545            """
546            def __init__(self):
547                # Typing of the subelements for data conversion
548                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
549                self.int_subelements = ( 'bandwidth',)
550                self.float_subelements = ( 'delay',)
551                # The final data structure
552                self.nodes = [ ]
553                self.lans =  [ ]
554                self.topo = { \
555                        'node': self.nodes,\
556                        'lan' : self.lans,\
557                    }
558                self.element = { }  # Current element being created
559                self.chars = ""     # Last text seen
560
561            def end_element(self, name):
562                # After each sub element the contents is added to the current
563                # element or to the appropriate list.
564                if name == 'node':
565                    self.nodes.append(self.element)
566                    self.element = { }
567                elif name == 'lan':
568                    self.lans.append(self.element)
569                    self.element = { }
570                elif name in self.str_subelements:
571                    self.element[name] = self.chars
572                    self.chars = ""
573                elif name in self.int_subelements:
574                    self.element[name] = int(self.chars)
575                    self.chars = ""
576                elif name in self.float_subelements:
577                    self.element[name] = float(self.chars)
578                    self.chars = ""
579
580            def found_chars(self, data):
581                self.chars += data.rstrip()
582
583
584        tp = topo_parse();
585        parser = xml.parsers.expat.ParserCreate()
586        parser.EndElementHandler = tp.end_element
587        parser.CharacterDataHandler = tp.found_chars
588
589        parser.Parse(str)
590
591        return tp.topo
592       
593
594    def genviz(self, topo):
595        """
596        Generate the visualization the virtual topology
597        """
598
599        neato = "/usr/local/bin/neato"
600        # These are used to parse neato output and to create the visualization
601        # file.
602        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
603        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
604                "%s</type></node>"
605
606        try:
607            # Node names
608            nodes = [ n['vname'] for n in topo['node'] ]
609            topo_lans = topo['lan']
610        except KeyError, e:
611            raise service_error(service_error.internal, "Bad topology: %s" %e)
612
613        lans = { }
614        links = { }
615
616        # Walk through the virtual topology, organizing the connections into
617        # 2-node connections (links) and more-than-2-node connections (lans).
618        # When a lan is created, it's added to the list of nodes (there's a
619        # node in the visualization for the lan).
620        for l in topo_lans:
621            if links.has_key(l['vname']):
622                if len(links[l['vname']]) < 2:
623                    links[l['vname']].append(l['vnode'])
624                else:
625                    nodes.append(l['vname'])
626                    lans[l['vname']] = links[l['vname']]
627                    del links[l['vname']]
628                    lans[l['vname']].append(l['vnode'])
629            elif lans.has_key(l['vname']):
630                lans[l['vname']].append(l['vnode'])
631            else:
632                links[l['vname']] = [ l['vnode'] ]
633
634
635        # Open up a temporary file for dot to turn into a visualization
636        try:
637            df, dotname = tempfile.mkstemp()
638            dotfile = os.fdopen(df, 'w')
639        except EnvironmentError:
640            raise service_error(service_error.internal,
641                    "Failed to open file in genviz")
642
643        try:
644            dnull = open('/dev/null', 'w')
645        except EnvironmentError:
646            service_error(service_error.internal,
647                    "Failed to open /dev/null in genviz")
648
649        # Generate a dot/neato input file from the links, nodes and lans
650        try:
651            print >>dotfile, "graph G {"
652            for n in nodes:
653                print >>dotfile, '\t"%s"' % n
654            for l in links.keys():
655                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
656            for l in lans.keys():
657                for n in lans[l]:
658                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
659            print >>dotfile, "}"
660            dotfile.close()
661        except TypeError:
662            raise service_error(service_error.internal,
663                    "Single endpoint link in vtopo")
664        except EnvironmentError:
665            raise service_error(service_error.internal, "Cannot write dot file")
666
667        # Use dot to create a visualization
668        try:
669            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
670                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
671                stderr=dnull, close_fds=True)
672        except EnvironmentError:
673            raise service_error(service_error.internal, 
674                    "Cannot generate visualization: is graphviz available?")
675        dnull.close()
676
677        # Translate dot to vis format
678        vis_nodes = [ ]
679        vis = { 'node': vis_nodes }
680        for line in dot.stdout:
681            m = vis_re.match(line)
682            if m:
683                vn = m.group(1)
684                vis_node = {'name': vn, \
685                        'x': float(m.group(2)),\
686                        'y' : float(m.group(3)),\
687                    }
688                if vn in links.keys() or vn in lans.keys():
689                    vis_node['type'] = 'lan'
690                else:
691                    vis_node['type'] = 'node'
692                vis_nodes.append(vis_node)
693        rv = dot.wait()
694
695        os.remove(dotname)
696        # XXX: graphviz seems to use low return codes for warnings, like
697        # "couldn't find font"
698        if rv < 2 : return vis
699        else: return None
700
701
702    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
703            cert_pwd=None):
704        """
705        Release access to testbed through fedd
706        """
707
708        if not uri and tbmap:
709            uri = tbmap.get(tb, None)
710        if not uri:
711            raise service_error(service_error.server_config, 
712                    "Unknown testbed: %s" % tb)
713
714        if self.local_access.has_key(uri):
715            resp = self.local_access[uri].ReleaseAccess(\
716                    { 'ReleaseAccessRequestBody' : 
717                        {'allocID': {'fedid': aid}},}, 
718                    fedid(file=cert_file))
719            resp = { 'ReleaseAccessResponseBody': resp } 
720        else:
721            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
722                    cert_file, cert_pwd, self.trusted_certs)
723
724        # better error coding
725
726    def remote_ns2topdl(self, uri, desc):
727
728        req = {
729                'description' : { 'ns2description': desc },
730            }
731
732        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
733                self.trusted_certs)
734
735        if r.has_key('Ns2TopdlResponseBody'):
736            r = r['Ns2TopdlResponseBody']
737            ed = r.get('experimentdescription', None)
738            if ed.has_key('topdldescription'):
739                return topdl.Topology(**ed['topdldescription'])
740            else:
741                raise service_error(service_error.protocol, 
742                        "Bad splitter response (no output)")
743        else:
744            raise service_error(service_error.protocol, "Bad splitter response")
745
746    class start_segment:
747        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
748                cert_pwd=None, trusted_certs=None, caller=None,
749                log_collector=None):
750            self.log = log
751            self.debug = debug
752            self.cert_file = cert_file
753            self.cert_pwd = cert_pwd
754            self.trusted_certs = None
755            self.caller = caller
756            self.testbed = testbed
757            self.log_collector = log_collector
758            self.response = None
759            self.node = { }
760            self.subs = { }
761            self.proof = None
762
763        def make_map(self, resp):
764            if 'segmentdescription' not in resp  or \
765                    'topdldescription' not in resp['segmentdescription']:
766                self.log.warn('No topology returned from startsegment')
767                return 
768
769            top = topdl.Topology(
770                    **resp['segmentdescription']['topdldescription'])
771
772            for e in top.elements:
773                if isinstance(e, topdl.Computer):
774                    self.node[e.name] = e
775            for s in top.substrates:
776                self.subs[s.name] = s
777
778        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
779            req = {
780                    'allocID': { 'fedid' : aid }, 
781                    'segmentdescription': { 
782                        'topdldescription': topo.to_dict(),
783                    },
784                }
785
786            if connInfo:
787                req['connection'] = connInfo
788
789            import_svcs = [ s for m in masters.values() \
790                    for s in m if self.testbed in s.importers]
791
792            if import_svcs or self.testbed in masters:
793                req['service'] = []
794
795            for s in import_svcs:
796                for r in s.reqs:
797                    sr = copy.deepcopy(r)
798                    sr['visibility'] = 'import';
799                    req['service'].append(sr)
800
801            for s in masters.get(self.testbed, []):
802                for r in s.reqs:
803                    sr = copy.deepcopy(r)
804                    sr['visibility'] = 'export';
805                    req['service'].append(sr)
806
807            if attrs:
808                req['fedAttr'] = attrs
809
810            try:
811                self.log.debug("Calling StartSegment at %s " % uri)
812                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
813                        self.trusted_certs)
814                if r.has_key('StartSegmentResponseBody'):
815                    lval = r['StartSegmentResponseBody'].get('allocationLog',
816                            None)
817                    if lval and self.log_collector:
818                        for line in  lval.splitlines(True):
819                            self.log_collector.write(line)
820                    self.make_map(r['StartSegmentResponseBody'])
821                    if 'proof' in r: self.proof = r['proof']
822                    self.response = r
823                else:
824                    raise service_error(service_error.internal, 
825                            "Bad response!?: %s" %r)
826                return True
827            except service_error, e:
828                self.log.error("Start segment failed on %s: %s" % \
829                        (self.testbed, e))
830                return False
831
832
833
834    class terminate_segment:
835        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
836                cert_pwd=None, trusted_certs=None, caller=None):
837            self.log = log
838            self.debug = debug
839            self.cert_file = cert_file
840            self.cert_pwd = cert_pwd
841            self.trusted_certs = None
842            self.caller = caller
843            self.testbed = testbed
844
845        def __call__(self, uri, aid ):
846            req = {
847                    'allocID': {'fedid': aid }, 
848                }
849            self.log.info("Calling terminate segment")
850            try:
851                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
852                        self.trusted_certs)
853                self.log.info("Terminate segment succeeded")
854                return True
855            except service_error, e:
856                self.log.error("Terminate segment failed on %s: %s" % \
857                        (self.testbed, e))
858                return False
859
860    class info_segment(start_segment):
861        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
862                cert_pwd=None, trusted_certs=None, caller=None,
863                log_collector=None):
864            experiment_control_local.start_segment.__init__(self, debug, 
865                    log, testbed, cert_file, cert_pwd, trusted_certs, 
866                    caller, log_collector)
867
868        def __call__(self, uri, aid):
869            req = { 'allocID': { 'fedid' : aid } }
870
871            try:
872                self.log.debug("Calling InfoSegment at %s " % uri)
873                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
874                        self.trusted_certs)
875                if r.has_key('InfoSegmentResponseBody'):
876                    self.make_map(r['InfoSegmentResponseBody'])
877                    if 'proof' in r: self.proof = r['proof']
878                    self.response = r
879                else:
880                    raise service_error(service_error.internal, 
881                            "Bad response!?: %s" %r)
882                return True
883            except service_error, e:
884                self.log.error("Info segment failed on %s: %s" % \
885                        (self.testbed, e))
886                return False
887
888    class operation_segment:
889        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
890                cert_pwd=None, trusted_certs=None, caller=None,
891                log_collector=None):
892            self.log = log
893            self.debug = debug
894            self.cert_file = cert_file
895            self.cert_pwd = cert_pwd
896            self.trusted_certs = None
897            self.caller = caller
898            self.testbed = testbed
899            self.status = None
900
901        def __call__(self, uri, aid, op, targets, params):
902            req = { 
903                    'allocID': { 'fedid' : aid },
904                    'operation': op,
905                    'target': targets,
906                    }
907            if params: req['parameter'] = params
908
909
910            try:
911                self.log.debug("Calling OperationSegment at %s " % uri)
912                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
913                        self.trusted_certs)
914                if 'OperationSegmentResponseBody' in r:
915                    r = r['OperationSegmentResponseBody']
916                    if 'status' in r:
917                        self.status = r['status']
918                else:
919                    raise service_error(service_error.internal, 
920                            "Bad response!?: %s" %r)
921                return True
922            except service_error, e:
923                self.log.error("Operation segment failed on %s: %s" % \
924                        (self.testbed, e))
925                return False
926
927    def annotate_topology(self, top, data):
928        # These routines do various parts of the annotation
929        def add_new_names(nl, l):
930            """ add any names in nl to the list in l """
931            for n in nl:
932                if n not in l: l.append(n)
933       
934        def merge_services(ne, e):
935            for ns in ne.service:
936                # NB: the else is on the for
937                for s in e.service:
938                    if ns.name == s.name:
939                        s.importer = ns.importer
940                        s.param = ns.param
941                        s.description = ns.description
942                        s.status = ns.status
943                        break
944                else:
945                    e.service.append(ns)
946       
947        def merge_oses(ne, e):
948            """
949            Merge the operating system entries of ne into e
950            """
951            for nos in ne.os:
952                # NB: the else is on the for
953                for os in e.os:
954                    if nos.name == os.name:
955                        os.version = nos.version
956                        os.version = nos.distribution
957                        os.version = nos.distributionversion
958                        for a in nos.attribute:
959                            if os.get_attribute(a.attribute):
960                                os.remove_attribute(a.attribute)
961                            os.set_attribute(a.attribute, a.value)
962                        break
963                else:
964                    # If both nodes have one OS, this is a replacement
965                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
966                    else: e.os.append(nos)
967
968        # Annotate the topology with embedding info
969        for e in top.elements:
970            if isinstance(e, topdl.Computer):
971                for s in data:
972                    ne = s.node.get(e.name, None)
973                    if ne is not None:
974                        add_new_names(ne.localname, e.localname)
975                        e.status = ne.status
976                        merge_services(ne, e)
977                        add_new_names(ne.operation, e.operation)
978                        if ne.os: merge_oses(ne, e)
979                        break
980        # Annotate substrates
981        for s in top.substrates:
982            for d in data:
983                ss = d.subs.get(s.name, None)
984                if ss is not None:
985                    if ss.capacity is not None:
986                        s.capacity = ss.capacity
987                    if s.latency is not None:
988                        s.latency = ss.latency
989
990
991
992    def allocate_resources(self, allocated, masters, eid, expid, 
993            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
994            attrs=None, connInfo={}, tbmap=None, expcert=None):
995
996        started = { }           # Testbeds where a sub-experiment started
997                                # successfully
998
999        # XXX
1000        fail_soft = False
1001
1002        if tbmap is None: tbmap = { }
1003
1004        log = alloc_log or self.log
1005
1006        tp = thread_pool(self.nthreads)
1007        threads = [ ]
1008        starters = [ ]
1009
1010        if expcert:
1011            cert = expcert
1012            pw = None
1013        else:
1014            cert = self.cert_file
1015            pw = self.cert_pwd
1016
1017        for tb in allocated.keys():
1018            # Create and start a thread to start the segment, and save it
1019            # to get the return value later
1020            tb_attrs = copy.copy(attrs)
1021            tp.wait_for_slot()
1022            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1023            base, suffix = split_testbed(tb)
1024            if suffix:
1025                tb_attrs.append({'attribute': 'experiment_name', 
1026                    'value': "%s-%s" % (eid, suffix)})
1027            else:
1028                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1029            if not uri:
1030                raise service_error(service_error.internal, 
1031                        "Unknown testbed %s !?" % tb)
1032
1033            aid = tbparams[tb].allocID
1034            if not aid:
1035                raise service_error(service_error.internal, 
1036                        "No alloc id for testbed %s !?" % tb)
1037
1038            s = self.start_segment(log=log, debug=self.debug,
1039                    testbed=tb, cert_file=cert,
1040                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1041                    caller=self.call_StartSegment,
1042                    log_collector=log_collector)
1043            starters.append(s)
1044            t  = pooled_thread(\
1045                    target=s, name=tb,
1046                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1047                    pdata=tp, trace_file=self.trace_file)
1048            threads.append(t)
1049            t.start()
1050
1051        # Wait until all finish (keep pinging the log, though)
1052        mins = 0
1053        revoked = False
1054        while not tp.wait_for_all_done(60.0):
1055            mins += 1
1056            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1057                    % mins)
1058            if not revoked and \
1059                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1060                # a testbed has failed.  Revoke this experiment's
1061                # synchronizarion values so that sub experiments will not
1062                # deadlock waiting for synchronization that will never happen
1063                self.log.info("A subexperiment has failed to swap in, " + \
1064                        "revoking synch keys")
1065                var_key = "fedid:%s" % expid
1066                for k in self.synch_store.all_keys():
1067                    if len(k) > 45 and k[0:46] == var_key:
1068                        self.synch_store.revoke_key(k)
1069                revoked = True
1070
1071        failed = [ t.getName() for t in threads if not t.rv ]
1072        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1073
1074        # If one failed clean up, unless fail_soft is set
1075        if failed:
1076            if not fail_soft:
1077                tp.clear()
1078                for tb in succeeded:
1079                    # Create and start a thread to stop the segment
1080                    tp.wait_for_slot()
1081                    uri = tbparams[tb].uri
1082                    t  = pooled_thread(\
1083                            target=self.terminate_segment(log=log,
1084                                testbed=tb,
1085                                cert_file=cert, 
1086                                cert_pwd=pw,
1087                                trusted_certs=self.trusted_certs,
1088                                caller=self.call_TerminateSegment),
1089                            args=(uri, tbparams[tb].allocID),
1090                            name=tb,
1091                            pdata=tp, trace_file=self.trace_file)
1092                    t.start()
1093                # Wait until all finish (if any are being stopped)
1094                if succeeded:
1095                    tp.wait_for_all_done()
1096
1097                # release the allocations
1098                for tb in tbparams.keys():
1099                    try:
1100                        self.release_access(tb, tbparams[tb].allocID, 
1101                                tbmap=tbmap, uri=tbparams[tb].uri,
1102                                cert_file=cert, cert_pwd=pw)
1103                    except service_error, e:
1104                        self.log.warn("Error releasing access: %s" % e.desc)
1105                # Remove the placeholder
1106                self.state_lock.acquire()
1107                self.state[eid].status = 'failed'
1108                self.state[eid].updated()
1109                if self.state_filename: self.write_state()
1110                self.state_lock.release()
1111                # Remove the repo dir
1112                self.remove_dirs("%s/%s" %(self.repodir, expid))
1113                # Walk up tmpdir, deleting as we go
1114                if self.cleanup:
1115                    self.remove_dirs(tmpdir)
1116                else:
1117                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1118
1119
1120                log.error("Swap in failed on %s" % ",".join(failed))
1121                return
1122        else:
1123            # Walk through the successes and gather the proofs
1124            proofs = { }
1125            for s in starters:
1126                if s.proof:
1127                    proofs[s.testbed] = s.proof
1128            self.annotate_topology(top, starters)
1129            log.info("[start_segment]: Experiment %s active" % eid)
1130
1131
1132        # Walk up tmpdir, deleting as we go
1133        if self.cleanup:
1134            self.remove_dirs(tmpdir)
1135        else:
1136            log.debug("[start_experiment]: not removing %s" % tmpdir)
1137
1138        # Insert the experiment into our state and update the disk copy.
1139        self.state_lock.acquire()
1140        self.state[expid].status = 'active'
1141        self.state[eid] = self.state[expid]
1142        self.state[eid].top = top
1143        self.state[eid].updated()
1144        # Append startup proofs
1145        for f in self.state[eid].get_all_allocations():
1146            if f.tb in proofs:
1147                f.proof.append(proofs[f.tb])
1148
1149        if self.state_filename: self.write_state()
1150        self.state_lock.release()
1151        return
1152
1153
1154    def add_kit(self, e, kit):
1155        """
1156        Add a Software object created from the list of (install, location)
1157        tuples passed as kit  to the software attribute of an object e.  We
1158        do this enough to break out the code, but it's kind of a hack to
1159        avoid changing the old tuple rep.
1160        """
1161
1162        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1163
1164        if isinstance(e.software, list): e.software.extend(s)
1165        else: e.software = s
1166
1167    def append_experiment_authorization(self, expid, attrs, 
1168            need_state_lock=True):
1169        """
1170        Append the authorization information to system state
1171        """
1172
1173        for p, a in attrs:
1174            self.auth.set_attribute(p, a)
1175        self.auth.save()
1176
1177        if need_state_lock: self.state_lock.acquire()
1178        # XXX: really a no op?
1179        #self.state[expid]['auth'].update(attrs)
1180        if self.state_filename: self.write_state()
1181        if need_state_lock: self.state_lock.release()
1182
1183    def clear_experiment_authorization(self, expid, need_state_lock=True):
1184        """
1185        Attrs is a set of attribute principal pairs that need to be removed
1186        from the authenticator.  Remove them and save the authenticator.
1187        """
1188
1189        if need_state_lock: self.state_lock.acquire()
1190        # XXX: should be a no-op
1191        #if expid in self.state and 'auth' in self.state[expid]:
1192            #for p, a in self.state[expid]['auth']:
1193                #self.auth.unset_attribute(p, a)
1194            #self.state[expid]['auth'] = set()
1195        if self.state_filename: self.write_state()
1196        if need_state_lock: self.state_lock.release()
1197        self.auth.save()
1198
1199
1200    def create_experiment_state(self, fid, req, expid, expcert,
1201            state='starting'):
1202        """
1203        Create the initial entry in the experiment's state.  The expid and
1204        expcert are the experiment's fedid and certifacte that represents that
1205        ID, which are installed in the experiment state.  If the request
1206        includes a suggested local name that is used if possible.  If the local
1207        name is already taken by an experiment owned by this user that has
1208        failed, it is overwritten.  Otherwise new letters are added until a
1209        valid localname is found.  The generated local name is returned.
1210        """
1211
1212        if req.has_key('experimentID') and \
1213                req['experimentID'].has_key('localname'):
1214            overwrite = False
1215            eid = req['experimentID']['localname']
1216            # If there's an old failed experiment here with the same local name
1217            # and accessible by this user, we'll overwrite it, otherwise we'll
1218            # fall through and do the collision avoidance.
1219            old_expid = self.get_experiment_fedid(eid)
1220            if old_expid:
1221                users_experiment = True
1222                try:
1223                    self.check_experiment_access(fid, old_expid)
1224                except service_error, e:
1225                    if e.code == service_error.access: users_experiment = False
1226                    else: raise e
1227                if users_experiment:
1228                    self.state_lock.acquire()
1229                    status = self.state[eid].status
1230                    if status and status == 'failed':
1231                        # remove the old access attributes
1232                        self.clear_experiment_authorization(eid,
1233                                need_state_lock=False)
1234                        overwrite = True
1235                        del self.state[eid]
1236                        del self.state[old_expid]
1237                    self.state_lock.release()
1238                else:
1239                    self.log.info('Experiment %s exists, ' % eid + \
1240                            'but this user cannot access it')
1241            self.state_lock.acquire()
1242            while (self.state.has_key(eid) and not overwrite):
1243                eid += random.choice(string.ascii_letters)
1244            # Initial state
1245            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1246                    identity=expcert)
1247            self.state[expid] = self.state[eid]
1248            if self.state_filename: self.write_state()
1249            self.state_lock.release()
1250        else:
1251            eid = self.exp_stem
1252            for i in range(0,5):
1253                eid += random.choice(string.ascii_letters)
1254            self.state_lock.acquire()
1255            while (self.state.has_key(eid)):
1256                eid = self.exp_stem
1257                for i in range(0,5):
1258                    eid += random.choice(string.ascii_letters)
1259            # Initial state
1260            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1261                    identity=expcert)
1262            self.state[expid] = self.state[eid]
1263            if self.state_filename: self.write_state()
1264            self.state_lock.release()
1265
1266        # Let users touch the state.  Authorize this fid and the expid itself
1267        # to touch the experiment, as well as allowing th eoverrides.
1268        self.append_experiment_authorization(eid, 
1269                set([(fid, expid), (expid,expid)] + \
1270                        [ (o, expid) for o in self.overrides]))
1271
1272        return eid
1273
1274
1275    def allocate_ips_to_topo(self, top):
1276        """
1277        Add an ip4_address attribute to all the hosts in the topology, based on
1278        the shared substrates on which they sit.  An /etc/hosts file is also
1279        created and returned as a list of hostfiles entries.  We also return
1280        the allocator, because we may need to allocate IPs to portals
1281        (specifically DRAGON portals).
1282        """
1283        subs = sorted(top.substrates, 
1284                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1285                reverse=True)
1286        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1287        ifs = { }
1288        hosts = [ ]
1289
1290        for idx, s in enumerate(subs):
1291            net_size = len(s.interfaces)+2
1292
1293            a = ips.allocate(net_size)
1294            if a :
1295                base, num = a
1296                if num < net_size: 
1297                    raise service_error(service_error.internal,
1298                            "Allocator returned wrong number of IPs??")
1299            else:
1300                raise service_error(service_error.req, 
1301                        "Cannot allocate IP addresses")
1302            mask = ips.min_alloc
1303            while mask < net_size:
1304                mask *= 2
1305
1306            netmask = ((2**32-1) ^ (mask-1))
1307
1308            base += 1
1309            for i in s.interfaces:
1310                i.attribute.append(
1311                        topdl.Attribute('ip4_address', 
1312                            "%s" % ip_addr(base)))
1313                i.attribute.append(
1314                        topdl.Attribute('ip4_netmask', 
1315                            "%s" % ip_addr(int(netmask))))
1316
1317                hname = i.element.name
1318                if ifs.has_key(hname):
1319                    hosts.append("%s\t%s-%s %s-%d" % \
1320                            (ip_addr(base), hname, s.name, hname,
1321                                ifs[hname]))
1322                else:
1323                    ifs[hname] = 0
1324                    hosts.append("%s\t%s-%s %s-%d %s" % \
1325                            (ip_addr(base), hname, s.name, hname,
1326                                ifs[hname], hname))
1327
1328                ifs[hname] += 1
1329                base += 1
1330        return hosts, ips
1331
1332    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1333            tbparam, masters, tbmap, expid=None, expcert=None):
1334        for tb in testbeds:
1335            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1336                    expcert)
1337            allocated[tb] = 1
1338
1339    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1340            expcert=None):
1341        """
1342        Get access to testbed through fedd and set the parameters for that tb
1343        """
1344        def get_export_project(svcs):
1345            """
1346            Look through for the list of federated_service for this testbed
1347            objects for a project_export service, and extract the project
1348            parameter.
1349            """
1350
1351            pe = [s for s in svcs if s.name=='project_export']
1352            if len(pe) == 1:
1353                return pe[0].params.get('project', None)
1354            elif len(pe) == 0:
1355                return None
1356            else:
1357                raise service_error(service_error.req,
1358                        "More than one project export is not supported")
1359
1360        def add_services(svcs, type, slist, keys):
1361            """
1362            Add the given services to slist.  type is import or export.  Also
1363            add a mapping entry from the assigned id to the original service
1364            record.
1365            """
1366            for i, s in enumerate(svcs):
1367                idx = '%s%d' % (type, i)
1368                keys[idx] = s
1369                sr = {'id': idx, 'name': s.name, 'visibility': type }
1370                if s.params:
1371                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1372                            for k, v in s.params.items()]
1373                slist.append(sr)
1374
1375        uri = tbmap.get(testbed_base(tb), None)
1376        if not uri:
1377            raise service_error(service_error.server_config, 
1378                    "Unknown testbed: %s" % tb)
1379
1380        export_svcs = masters.get(tb,[])
1381        import_svcs = [ s for m in masters.values() \
1382                for s in m \
1383                    if tb in s.importers ]
1384
1385        export_project = get_export_project(export_svcs)
1386        # Compose the credential list so that IDs come before attributes
1387        creds = set()
1388        keys = set()
1389        certs = self.auth.get_creds_for_principal(fid)
1390        if expid:
1391            certs.update(self.auth.get_creds_for_principal(expid))
1392        for c in certs:
1393            keys.add(c.issuer_cert())
1394            creds.add(c.attribute_cert())
1395        creds = list(keys) + list(creds)
1396
1397        if expcert: cert, pw = expcert, None
1398        else: cert, pw = self.cert_file, self.cert_pw
1399
1400        # Request credentials
1401        req = {
1402                'abac_credential': creds,
1403            }
1404        # Make the service request from the services we're importing and
1405        # exporting.  Keep track of the export request ids so we can
1406        # collect the resulting info from the access response.
1407        e_keys = { }
1408        if import_svcs or export_svcs:
1409            slist = []
1410            add_services(import_svcs, 'import', slist, e_keys)
1411            add_services(export_svcs, 'export', slist, e_keys)
1412            req['service'] = slist
1413
1414        if self.local_access.has_key(uri):
1415            # Local access call
1416            req = { 'RequestAccessRequestBody' : req }
1417            r = self.local_access[uri].RequestAccess(req, 
1418                    fedid(file=self.cert_file))
1419            r = { 'RequestAccessResponseBody' : r }
1420        else:
1421            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1422
1423        if r.has_key('RequestAccessResponseBody'):
1424            # Through to here we have a valid response, not a fault.
1425            # Access denied is a fault, so something better or worse than
1426            # access denied has happened.
1427            r = r['RequestAccessResponseBody']
1428            self.log.debug("[get_access] Access granted")
1429        else:
1430            raise service_error(service_error.protocol,
1431                        "Bad proxy response")
1432        if 'proof' not in r:
1433            raise service_error(service_error.protocol,
1434                        "Bad access response (no access proof)")
1435
1436        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1437                tb=tb, uri=uri, proof=[r['proof']], 
1438                services=masters.get(tb, None))
1439
1440        # Collect the responses corresponding to the services this testbed
1441        # exports.  These will be the service requests that we will include in
1442        # the start segment requests (with appropriate visibility values) to
1443        # import and export the segments.
1444        for s in r.get('service', []):
1445            id = s.get('id', None)
1446            # Note that this attaches the response to the object in the masters
1447            # data structure.  (The e_keys index disappears when this fcn
1448            # returns)
1449            if id and id in e_keys:
1450                e_keys[id].reqs.append(s)
1451
1452        # Add attributes to parameter space.  We don't allow attributes to
1453        # overlay any parameters already installed.
1454        for a in r.get('fedAttr', []):
1455            try:
1456                if a['attribute']:
1457                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1458            except KeyError:
1459                self.log.error("Bad attribute in response: %s" % a)
1460
1461
1462    def split_topology(self, top, topo, testbeds):
1463        """
1464        Create the sub-topologies that are needed for experiment instantiation.
1465        """
1466        for tb in testbeds:
1467            topo[tb] = top.clone()
1468            # copy in for loop allows deletions from the original
1469            for e in [ e for e in topo[tb].elements]:
1470                etb = e.get_attribute('testbed')
1471                # NB: elements without a testbed attribute won't appear in any
1472                # sub topologies. 
1473                if not etb or etb != tb:
1474                    for i in e.interface:
1475                        for s in i.subs:
1476                            try:
1477                                s.interfaces.remove(i)
1478                            except ValueError:
1479                                raise service_error(service_error.internal,
1480                                        "Can't remove interface??")
1481                    topo[tb].elements.remove(e)
1482            topo[tb].make_indices()
1483
1484    def confirm_software(self, top):
1485        """
1486        Make sure that the software to be loaded in the topo is all available
1487        before we begin making access requests, etc.  This is a subset of
1488        wrangle_software.
1489        """
1490        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1491        pkgs.update([x.location for e in top.elements for x in e.software])
1492
1493        for pkg in pkgs:
1494            loc = pkg
1495
1496            scheme, host, path = urlparse(loc)[0:3]
1497            dest = os.path.basename(path)
1498            if not scheme:
1499                if not loc.startswith('/'):
1500                    loc = "/%s" % loc
1501                loc = "file://%s" %loc
1502            # NB: if scheme was found, loc == pkg
1503            try:
1504                u = urlopen(loc)
1505                u.close()
1506            except Exception, e:
1507                raise service_error(service_error.req, 
1508                        "Cannot open %s: %s" % (loc, e))
1509        return True
1510
1511    def wrangle_software(self, expid, top, topo, tbparams):
1512        """
1513        Copy software out to the repository directory, allocate permissions and
1514        rewrite the segment topologies to look for the software in local
1515        places.
1516        """
1517
1518        # Copy the rpms and tarfiles to a distribution directory from
1519        # which the federants can retrieve them
1520        linkpath = "%s/software" %  expid
1521        softdir ="%s/%s" % ( self.repodir, linkpath)
1522        softmap = { }
1523
1524        # self.fedkit and self.gateway kit are lists of tuples of
1525        # (install_location, download_location) this extracts the download
1526        # locations.
1527        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1528        pkgs.update([x.location for e in top.elements for x in e.software])
1529        try:
1530            os.makedirs(softdir)
1531        except EnvironmentError, e:
1532            raise service_error(
1533                    "Cannot create software directory: %s" % e)
1534        # The actual copying.  Everything's converted into a url for copying.
1535        auth_attrs = set()
1536        for pkg in pkgs:
1537            loc = pkg
1538
1539            scheme, host, path = urlparse(loc)[0:3]
1540            dest = os.path.basename(path)
1541            if not scheme:
1542                if not loc.startswith('/'):
1543                    loc = "/%s" % loc
1544                loc = "file://%s" %loc
1545            # NB: if scheme was found, loc == pkg
1546            try:
1547                u = urlopen(loc)
1548            except Exception, e:
1549                raise service_error(service_error.req, 
1550                        "Cannot open %s: %s" % (loc, e))
1551            try:
1552                f = open("%s/%s" % (softdir, dest) , "w")
1553                self.log.debug("Writing %s/%s" % (softdir,dest) )
1554                data = u.read(4096)
1555                while data:
1556                    f.write(data)
1557                    data = u.read(4096)
1558                f.close()
1559                u.close()
1560            except Exception, e:
1561                raise service_error(service_error.internal,
1562                        "Could not copy %s: %s" % (loc, e))
1563            path = re.sub("/tmp", "", linkpath)
1564            # XXX
1565            softmap[pkg] = \
1566                    "%s/%s/%s" %\
1567                    ( self.repo_url, path, dest)
1568
1569            # Allow the individual segments to access the software by assigning
1570            # an attribute to each testbed allocation that encodes the data to
1571            # be released.  This expression collects the data for each run of
1572            # the loop.
1573            auth_attrs.update([
1574                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1575                        for tb in tbparams.keys()])
1576
1577        self.append_experiment_authorization(expid, auth_attrs)
1578
1579        # Convert the software locations in the segments into the local
1580        # copies on this host
1581        for soft in [ s for tb in topo.values() \
1582                for e in tb.elements \
1583                    if getattr(e, 'software', False) \
1584                        for s in e.software ]:
1585            if softmap.has_key(soft.location):
1586                soft.location = softmap[soft.location]
1587
1588
1589    def new_experiment(self, req, fid):
1590        """
1591        The external interface to empty initial experiment creation called from
1592        the dispatcher.
1593
1594        Creates a working directory, splits the incoming description using the
1595        splitter script and parses out the avrious subsections using the
1596        lcasses above.  Once each sub-experiment is created, use pooled threads
1597        to instantiate them and start it all up.
1598        """
1599        self.log.info("New experiment call started for %s" % fid)
1600        req = req.get('NewRequestBody', None)
1601        if not req:
1602            raise service_error(service_error.req,
1603                    "Bad request format (no NewRequestBody)")
1604
1605        if self.auth.import_credentials(data_list=req.get('credential', [])):
1606            self.auth.save()
1607
1608        try:
1609            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1610                    with_proof=True)
1611        except service_error, e:
1612            self.log.info("New experiment call for %s: access denied" % fid)
1613            raise e
1614
1615
1616        if not access_ok:
1617            self.log.info("New experiment call for %s: Access denied" % fid)
1618            raise service_error(service_error.access, "New access denied",
1619                    proof=[proof])
1620
1621        try:
1622            tmpdir = tempfile.mkdtemp(prefix="split-")
1623        except EnvironmentError:
1624            raise service_error(service_error.internal, "Cannot create tmp dir")
1625
1626        # Generate an ID for the experiment (slice) and a certificate that the
1627        # allocator can use to prove they own it.  We'll ship it back through
1628        # the encrypted connection.  If the requester supplied one, use it.
1629        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1630            expcert = req['experimentAccess']['X509']
1631            expid = fedid(certstr=expcert)
1632            self.state_lock.acquire()
1633            if expid in self.state:
1634                self.state_lock.release()
1635                raise service_error(service_error.req, 
1636                        'fedid %s identifies an existing experiment' % expid)
1637            self.state_lock.release()
1638        else:
1639            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1640
1641        #now we're done with the tmpdir, and it should be empty
1642        if self.cleanup:
1643            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1644            os.rmdir(tmpdir)
1645        else:
1646            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1647
1648        eid = self.create_experiment_state(fid, req, expid, expcert, 
1649                state='empty')
1650
1651        rv = {
1652                'experimentID': [
1653                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1654                ],
1655                'experimentStatus': 'empty',
1656                'experimentAccess': { 'X509' : expcert },
1657                'proof': proof.to_dict(),
1658            }
1659
1660        self.log.info("New experiment call succeeded for %s" % fid)
1661        return rv
1662
1663    # create_experiment sub-functions
1664
1665    @staticmethod
1666    def get_experiment_key(req, field='experimentID'):
1667        """
1668        Parse the experiment identifiers out of the request (the request body
1669        tag has been removed).  Specifically this pulls either the fedid or the
1670        localname out of the experimentID field.  A fedid is preferred.  If
1671        neither is present or the request does not contain the fields,
1672        service_errors are raised.
1673        """
1674        # Get the experiment access
1675        exp = req.get(field, None)
1676        if exp:
1677            if exp.has_key('fedid'):
1678                key = exp['fedid']
1679            elif exp.has_key('localname'):
1680                key = exp['localname']
1681            else:
1682                raise service_error(service_error.req, "Unknown lookup type")
1683        else:
1684            raise service_error(service_error.req, "No request?")
1685
1686        return key
1687
1688    def get_experiment_ids_and_start(self, key, tmpdir):
1689        """
1690        Get the experiment name, id and access certificate from the state, and
1691        set the experiment state to 'starting'.  returns a triple (fedid,
1692        localname, access_cert_file). The access_cert_file is a copy of the
1693        contents of the access certificate, created in the tempdir with
1694        restricted permissions.  If things are confused, raise an exception.
1695        """
1696
1697        expid = eid = None
1698        self.state_lock.acquire()
1699        if key in self.state:
1700            exp = self.state[key]
1701            exp.status = "starting"
1702            exp.updated()
1703            expid = exp.fedid
1704            eid = exp.localname
1705            expcert = exp.identity
1706        self.state_lock.release()
1707
1708        # make a protected copy of the access certificate so the experiment
1709        # controller can act as the experiment principal.
1710        if expcert:
1711            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1712            if not expcert_file:
1713                raise service_error(service_error.internal, 
1714                        "Cannot create temp cert file?")
1715        else:
1716            expcert_file = None
1717
1718        return (eid, expid, expcert_file)
1719
1720    def get_topology(self, req, tmpdir):
1721        """
1722        Get the ns2 content and put it into a file for parsing.  Call the local
1723        or remote parser and return the topdl.Topology.  Errors result in
1724        exceptions.  req is the request and tmpdir is a work directory.
1725        """
1726
1727        # The tcl parser needs to read a file so put the content into that file
1728        descr=req.get('experimentdescription', None)
1729        if descr:
1730            if 'ns2description' in descr:
1731                file_content=descr['ns2description']
1732            elif 'topdldescription' in descr:
1733                return topdl.Topology(**descr['topdldescription'])
1734            else:
1735                raise service_error(service_error.req, 
1736                        'Unknown experiment description type')
1737        else:
1738            raise service_error(service_error.req, "No experiment description")
1739
1740
1741        if self.splitter_url:
1742            self.log.debug("Calling remote topdl translator at %s" % \
1743                    self.splitter_url)
1744            top = self.remote_ns2topdl(self.splitter_url, file_content)
1745        else:
1746            tclfile = os.path.join(tmpdir, "experiment.tcl")
1747            if file_content:
1748                try:
1749                    f = open(tclfile, 'w')
1750                    f.write(file_content)
1751                    f.close()
1752                except EnvironmentError:
1753                    raise service_error(service_error.internal,
1754                            "Cannot write temp experiment description")
1755            else:
1756                raise service_error(service_error.req, 
1757                        "Only ns2descriptions supported")
1758            pid = "dummy"
1759            gid = "dummy"
1760            eid = "dummy"
1761
1762            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1763                str(self.muxmax), '-m', 'dummy']
1764
1765            tclcmd.extend([pid, gid, eid, tclfile])
1766
1767            self.log.debug("running local splitter %s", " ".join(tclcmd))
1768            # This is just fantastic.  As a side effect the parser copies
1769            # tb_compat.tcl into the current directory, so that directory
1770            # must be writable by the fedd user.  Doing this in the
1771            # temporary subdir ensures this is the case.
1772            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1773                    cwd=tmpdir)
1774            split_data = tclparser.stdout
1775
1776            top = topdl.topology_from_xml(file=split_data, top="experiment")
1777            os.remove(tclfile)
1778
1779        return top
1780
1781    def get_testbed_services(self, req, testbeds):
1782        """
1783        Parse the services section of the request into two dicts mapping
1784        testbed to lists of federated_service objects.  The first dict maps all
1785        exporters of services to those service objects, the second maps
1786        testbeds to service objects only for services requiring portals.
1787        """
1788        # We construct both dicts here because deriving the second is more
1789        # comples than it looks - both the keys and lists can differ, and it's
1790        # much easier to generate both in one pass.
1791        masters = { }
1792        pmasters = { }
1793        for s in req.get('service', []):
1794            # If this is a service request with the importall field
1795            # set, fill it out.
1796
1797            if s.get('importall', False):
1798                s['import'] = [ tb for tb in testbeds \
1799                        if tb not in s.get('export',[])]
1800                del s['importall']
1801
1802            # Add the service to masters
1803            for tb in s.get('export', []):
1804                if s.get('name', None):
1805
1806                    params = { }
1807                    for a in s.get('fedAttr', []):
1808                        params[a.get('attribute', '')] = a.get('value','')
1809
1810                    fser = federated_service(name=s['name'],
1811                            exporter=tb, importers=s.get('import',[]),
1812                            params=params)
1813                    if fser.name == 'hide_hosts' \
1814                            and 'hosts' not in fser.params:
1815                        fser.params['hosts'] = \
1816                                ",".join(tb_hosts.get(fser.exporter, []))
1817                    if tb in masters: masters[tb].append(fser)
1818                    else: masters[tb] = [fser]
1819
1820                    if fser.portal:
1821                        if tb in pmasters: pmasters[tb].append(fser)
1822                        else: pmasters[tb] = [fser]
1823                else:
1824                    self.log.error('Testbed service does not have name " + \
1825                            "and importers')
1826        return masters, pmasters
1827
1828    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1829        """
1830        Create the ssh keys necessary for interconnecting the portal nodes and
1831        the global hosts file for letting each segment know about the IP
1832        addresses in play.  Save these into the repo.  Add attributes to the
1833        autorizer allowing access controllers to download them and return a set
1834        of attributes that inform the segments where to find this stuff.  May
1835        raise service_errors in if there are problems.
1836        """
1837        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1838        gw_secretkey_base = "fed.%s" % self.ssh_type
1839        keydir = os.path.join(tmpdir, 'keys')
1840        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1841        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1842
1843        try:
1844            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1845        except ValueError:
1846            raise service_error(service_error.server_config, 
1847                    "Bad key type (%s)" % self.ssh_type)
1848
1849        self.generate_seer_certs(keydir)
1850
1851        # Copy configuration files into the remote file store
1852        # The config urlpath
1853        configpath = "/%s/config" % expid
1854        # The config file system location
1855        configdir ="%s%s" % ( self.repodir, configpath)
1856        try:
1857            os.makedirs(configdir)
1858        except EnvironmentError, e:
1859            raise service_error(service_error.internal,
1860                    "Cannot create config directory: %s" % e)
1861        try:
1862            f = open("%s/hosts" % configdir, "w")
1863            print >> f, string.join(hosts, '\n')
1864            f.close()
1865        except EnvironmentError, e:
1866            raise service_error(service_error.internal, 
1867                    "Cannot write hosts file: %s" % e)
1868        try:
1869            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1870            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1871            copy_file(os.path.join(keydir, 'ca.pem'), 
1872                    os.path.join(configdir, 'ca.pem'))
1873            copy_file(os.path.join(keydir, 'node.pem'), 
1874                    os.path.join(configdir, 'node.pem'))
1875        except EnvironmentError, e:
1876            raise service_error(service_error.internal, 
1877                    "Cannot copy keyfiles: %s" % e)
1878
1879        # Allow the individual testbeds to access the configuration files,
1880        # again by setting an attribute for the relevant pathnames on each
1881        # allocation principal.  Yeah, that's a long list comprehension.
1882        self.append_experiment_authorization(expid, set([
1883            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1884                    for tb in tbparams.keys() \
1885                        for f in ("hosts", 'ca.pem', 'node.pem', 
1886                            gw_secretkey_base, gw_pubkey_base)]))
1887
1888        attrs = [ 
1889                {
1890                    'attribute': 'ssh_pubkey', 
1891                    'value': '%s/%s/config/%s' % \
1892                            (self.repo_url, expid, gw_pubkey_base)
1893                },
1894                {
1895                    'attribute': 'ssh_secretkey', 
1896                    'value': '%s/%s/config/%s' % \
1897                            (self.repo_url, expid, gw_secretkey_base)
1898                },
1899                {
1900                    'attribute': 'hosts', 
1901                    'value': '%s/%s/config/hosts' % \
1902                            (self.repo_url, expid)
1903                },
1904                {
1905                    'attribute': 'seer_ca_pem', 
1906                    'value': '%s/%s/config/%s' % \
1907                            (self.repo_url, expid, 'ca.pem')
1908                },
1909                {
1910                    'attribute': 'seer_node_pem', 
1911                    'value': '%s/%s/config/%s' % \
1912                            (self.repo_url, expid, 'node.pem')
1913                },
1914            ]
1915        return attrs
1916
1917
1918    def get_vtopo(self, req, fid):
1919        """
1920        Return the stored virtual topology for this experiment
1921        """
1922        rv = None
1923        state = None
1924        self.log.info("vtopo call started for %s" %  fid)
1925
1926        req = req.get('VtopoRequestBody', None)
1927        if not req:
1928            raise service_error(service_error.req,
1929                    "Bad request format (no VtopoRequestBody)")
1930        exp = req.get('experiment', None)
1931        if exp:
1932            if exp.has_key('fedid'):
1933                key = exp['fedid']
1934                keytype = "fedid"
1935            elif exp.has_key('localname'):
1936                key = exp['localname']
1937                keytype = "localname"
1938            else:
1939                raise service_error(service_error.req, "Unknown lookup type")
1940        else:
1941            raise service_error(service_error.req, "No request?")
1942
1943        try:
1944            proof = self.check_experiment_access(fid, key)
1945        except service_error, e:
1946            self.log.info("vtopo call failed for %s: access denied" %  fid)
1947            raise e
1948
1949        self.state_lock.acquire()
1950        # XXX: this needs to be recalculated
1951        if key in self.state:
1952            if self.state[key].top is not None:
1953                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1954                rv = { 'experiment' : {keytype: key },
1955                        'vtopo': vtopo,
1956                        'proof': proof.to_dict(), 
1957                    }
1958            else:
1959                state = self.state[key].status
1960        self.state_lock.release()
1961
1962        if rv: 
1963            self.log.info("vtopo call completed for %s %s " % \
1964                (key, fid))
1965            return rv
1966        else: 
1967            if state:
1968                self.log.info("vtopo call completed for %s %s (Not ready)" % \
1969                    (key, fid))
1970                raise service_error(service_error.partial, 
1971                        "Not ready: %s" % state)
1972            else:
1973                self.log.info("vtopo call completed for %s %s (No experiment)"\
1974                        % (key, fid))
1975                raise service_error(service_error.req, "No such experiment")
1976
1977    def get_vis(self, req, fid):
1978        """
1979        Return the stored visualization for this experiment
1980        """
1981        rv = None
1982        state = None
1983
1984        self.log.info("vis call started for %s" %  fid)
1985        req = req.get('VisRequestBody', None)
1986        if not req:
1987            raise service_error(service_error.req,
1988                    "Bad request format (no VisRequestBody)")
1989        exp = req.get('experiment', None)
1990        if exp:
1991            if exp.has_key('fedid'):
1992                key = exp['fedid']
1993                keytype = "fedid"
1994            elif exp.has_key('localname'):
1995                key = exp['localname']
1996                keytype = "localname"
1997            else:
1998                raise service_error(service_error.req, "Unknown lookup type")
1999        else:
2000            raise service_error(service_error.req, "No request?")
2001
2002        try:
2003            proof = self.check_experiment_access(fid, key)
2004        except service_error, e:
2005            self.log.info("vis call failed for %s: access denied" %  fid)
2006            raise e
2007
2008        self.state_lock.acquire()
2009        # Generate the visualization
2010        if key in self.state:
2011            if self.state[key].top is not None:
2012                try:
2013                    vis = self.genviz(
2014                            topdl.topology_to_vtopo(self.state[key].top))
2015                except service_error, e:
2016                    self.state_lock.release()
2017                    raise e
2018                rv =  { 'experiment' : {keytype: key },
2019                        'vis': vis,
2020                        'proof': proof.to_dict(), 
2021                        }
2022            else:
2023                state = self.state[key].status
2024        self.state_lock.release()
2025
2026        if rv: 
2027            self.log.info("vis call completed for %s %s " % \
2028                (key, fid))
2029            return rv
2030        else:
2031            if state:
2032                self.log.info("vis call completed for %s %s (not ready)" % \
2033                    (key, fid))
2034                raise service_error(service_error.partial, 
2035                        "Not ready: %s" % state)
2036            else:
2037                self.log.info("vis call completed for %s %s (no experiment)" % \
2038                    (key, fid))
2039                raise service_error(service_error.req, "No such experiment")
2040
2041   
2042    def save_federant_information(self, allocated, tbparams, eid, top):
2043        """
2044        Store the various data that have changed in the experiment state
2045        between when it was started and the beginning of resource allocation.
2046        This is basically the information about each local allocation.  This
2047        fills in the values of the placeholder allocation in the state.  It
2048        also collects the access proofs and returns them as dicts for a
2049        response message.
2050        """
2051        self.state_lock.acquire()
2052        exp = self.state[eid]
2053        exp.top = top.clone()
2054        # save federant information
2055        for k in allocated.keys():
2056            exp.add_allocation(tbparams[k])
2057            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2058                type="testbed", localname=[k], 
2059                service=[ s.to_topdl() for s in tbparams[k].services]))
2060
2061        # Access proofs for the response message
2062        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2063                    for p in tbparams[k].proof]
2064        exp.updated()
2065        if self.state_filename: 
2066            self.write_state()
2067        self.state_lock.release()
2068        return proofs
2069
2070    def clear_placeholder(self, eid, expid, tmpdir):
2071        """
2072        Clear the placeholder and remove any allocated temporary dir.
2073        """
2074
2075        self.state_lock.acquire()
2076        del self.state[eid]
2077        del self.state[expid]
2078        if self.state_filename: self.write_state()
2079        self.state_lock.release()
2080        if tmpdir and self.cleanup:
2081            self.remove_dirs(tmpdir)
2082
2083    # end of create_experiment sub-functions
2084
2085    def create_experiment(self, req, fid):
2086        """
2087        The external interface to experiment creation called from the
2088        dispatcher.
2089
2090        Creates a working directory, splits the incoming description using the
2091        splitter script and parses out the various subsections using the
2092        classes above.  Once each sub-experiment is created, use pooled threads
2093        to instantiate them and start it all up.
2094        """
2095
2096        self.log.info("Create experiment call started for %s" % fid)
2097        req = req.get('CreateRequestBody', None)
2098        if req:
2099            key = self.get_experiment_key(req)
2100        else:
2101            raise service_error(service_error.req,
2102                    "Bad request format (no CreateRequestBody)")
2103
2104        # Import information from the requester
2105        if self.auth.import_credentials(data_list=req.get('credential', [])):
2106            self.auth.save()
2107        else:
2108            self.log.debug("Failed to import delegation credentials(!)")
2109
2110        try:
2111            # Make sure that the caller can talk to us
2112            proof = self.check_experiment_access(fid, key)
2113        except service_error, e:
2114            self.log.info("Create experiment call failed for %s: access denied"\
2115                    % fid)
2116            raise e
2117
2118
2119        # Install the testbed map entries supplied with the request into a copy
2120        # of the testbed map.
2121        tbmap = dict(self.tbmap)
2122        tbactive = set(self.tbactive)
2123        for m in req.get('testbedmap', []):
2124            if 'testbed' in m and 'uri' in m:
2125                tbmap[m['testbed']] = m['uri']
2126                if 'active' in m and m['active']: tbactive.add(m['testbed'])
2127
2128        # a place to work
2129        try:
2130            tmpdir = tempfile.mkdtemp(prefix="split-")
2131            os.mkdir(tmpdir+"/keys")
2132        except EnvironmentError:
2133            raise service_error(service_error.internal, "Cannot create tmp dir")
2134
2135        tbparams = { }
2136
2137        eid, expid, expcert_file = \
2138                self.get_experiment_ids_and_start(key, tmpdir)
2139
2140        # This catches exceptions to clear the placeholder if necessary
2141        try: 
2142            if not (eid and expid):
2143                raise service_error(service_error.internal, 
2144                        "Cannot find local experiment info!?")
2145
2146            top = self.get_topology(req, tmpdir)
2147            self.confirm_software(top)
2148            # Assign the IPs
2149            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2150            # Find the testbeds to look up
2151            tb_hosts = { }
2152            testbeds = [ ]
2153            for e in top.elements:
2154                if isinstance(e, topdl.Computer):
2155                    tb = e.get_attribute('testbed') or 'default'
2156                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2157                    else: 
2158                        tb_hosts[tb] = [ e.name ]
2159                        testbeds.append(tb)
2160
2161            masters, pmasters = self.get_testbed_services(req, testbeds)
2162            allocated = { }         # Testbeds we can access
2163            topo ={ }               # Sub topologies
2164            connInfo = { }          # Connection information
2165
2166            self.split_topology(top, topo, testbeds)
2167
2168            self.get_access_to_testbeds(testbeds, fid, allocated, 
2169                    tbparams, masters, tbmap, expid, expcert_file)
2170
2171            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2172
2173            part = experiment_partition(self.auth, self.store_url, tbmap,
2174                    self.muxmax, self.direct_transit, tbactive)
2175            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2176                    connInfo, expid)
2177
2178            auth_attrs = set()
2179            # Now get access to the dynamic testbeds (those added above)
2180            for tb in [ t for t in topo if t not in allocated]:
2181                self.get_access(tb, tbparams, fid, masters, tbmap, 
2182                        expid, expcert_file)
2183                allocated[tb] = 1
2184                store_keys = topo[tb].get_attribute('store_keys')
2185                # Give the testbed access to keys it exports or imports
2186                if store_keys:
2187                    auth_attrs.update(set([
2188                        (tbparams[tb].allocID, sk) \
2189                                for sk in store_keys.split(" ")]))
2190
2191            if auth_attrs:
2192                self.append_experiment_authorization(expid, auth_attrs)
2193
2194            # transit and disconnected testbeds may not have a connInfo entry.
2195            # Fill in the blanks.
2196            for t in allocated.keys():
2197                if not connInfo.has_key(t):
2198                    connInfo[t] = { }
2199
2200            self.wrangle_software(expid, top, topo, tbparams)
2201
2202            proofs = self.save_federant_information(allocated, tbparams, 
2203                    eid, top)
2204        except service_error, e:
2205            # If something goes wrong in the parse (usually an access error)
2206            # clear the placeholder state.  From here on out the code delays
2207            # exceptions.  Failing at this point returns a fault to the remote
2208            # caller.
2209
2210            self.log.info("Create experiment call failed for %s %s: %s" % 
2211                    (eid, fid, e))
2212            self.clear_placeholder(eid, expid, tmpdir)
2213            raise e
2214
2215        # Start the background swapper and return the starting state.  From
2216        # here on out, the state will stick around a while.
2217
2218        # Create a logger that logs to the experiment's state object as well as
2219        # to the main log file.
2220        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2221        alloc_collector = self.list_log(self.state[eid].log)
2222        h = logging.StreamHandler(alloc_collector)
2223        # XXX: there should be a global one of these rather than repeating the
2224        # code.
2225        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2226                    '%d %b %y %H:%M:%S'))
2227        alloc_log.addHandler(h)
2228
2229        # Start a thread to do the resource allocation
2230        t  = Thread(target=self.allocate_resources,
2231                args=(allocated, masters, eid, expid, tbparams, 
2232                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2233                    connInfo, tbmap, expcert_file),
2234                name=eid)
2235        t.start()
2236
2237        rv = {
2238                'experimentID': [
2239                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2240                ],
2241                'experimentStatus': 'starting',
2242                'proof': [ proof.to_dict() ] + proofs,
2243            }
2244        self.log.info("Create experiment call succeeded for %s %s" % \
2245                (eid, fid))
2246
2247        return rv
2248   
2249    def get_experiment_fedid(self, key):
2250        """
2251        find the fedid associated with the localname key in the state database.
2252        """
2253
2254        rv = None
2255        self.state_lock.acquire()
2256        if key in self.state:
2257            rv = self.state[key].fedid
2258        self.state_lock.release()
2259        return rv
2260
2261    def check_experiment_access(self, fid, key):
2262        """
2263        Confirm that the fid has access to the experiment.  Though a request
2264        may be made in terms of a local name, the access attribute is always
2265        the experiment's fedid.
2266        """
2267        if not isinstance(key, fedid):
2268            key = self.get_experiment_fedid(key)
2269
2270        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2271
2272        if access_ok:
2273            return proof
2274        else:
2275            raise service_error(service_error.access, "Access Denied",
2276                proof)
2277
2278
2279    def get_handler(self, path, fid):
2280        """
2281        Perhaps surprisingly named, this function handles HTTP GET requests to
2282        this server (SOAP requests are POSTs).
2283        """
2284        self.log.info("Get handler %s %s" % (path, fid))
2285        # XXX: log proofs?
2286        if self.auth.check_attribute(fid, path):
2287            return ("%s/%s" % (self.repodir, path), "application/binary")
2288        else:
2289            return (None, None)
2290
2291    def update_info(self, key, force=False):
2292        top = None
2293        self.state_lock.acquire()
2294        if key in self.state:
2295            if force or self.state[key].older_than(self.info_cache_limit):
2296                top = self.state[key].top
2297                if top is not None: top = top.clone()
2298                d1, info_params, cert, d2 = \
2299                        self.get_segment_info(self.state[key], need_lock=False)
2300        self.state_lock.release()
2301
2302        if top is None: return
2303
2304        try:
2305            tmpdir = tempfile.mkdtemp(prefix="info-")
2306        except EnvironmentError:
2307            raise service_error(service_error.internal, 
2308                    "Cannot create tmp dir")
2309        cert_file = self.make_temp_certfile(cert, tmpdir)
2310
2311        data = []
2312        try:
2313            for k, (uri, aid) in info_params.items():
2314                info=self.info_segment(log=self.log, testbed=uri,
2315                            cert_file=cert_file, cert_pwd=None,
2316                            trusted_certs=self.trusted_certs,
2317                            caller=self.call_InfoSegment)
2318                info(uri, aid)
2319                data.append(info)
2320        # Clean up the tmpdir no matter what
2321        finally:
2322            if tmpdir: self.remove_dirs(tmpdir)
2323
2324        self.annotate_topology(top, data)
2325        self.state_lock.acquire()
2326        if key in self.state:
2327            self.state[key].top = top
2328            self.state[key].updated()
2329            if self.state_filename: self.write_state()
2330        self.state_lock.release()
2331
2332   
2333    def get_info(self, req, fid):
2334        """
2335        Return all the stored info about this experiment
2336        """
2337        rv = None
2338
2339        self.log.info("Info call started for %s" %  fid)
2340        req = req.get('InfoRequestBody', None)
2341        if not req:
2342            raise service_error(service_error.req,
2343                    "Bad request format (no InfoRequestBody)")
2344        exp = req.get('experiment', None)
2345        legacy = req.get('legacy', False)
2346        fresh = req.get('fresh', False)
2347        if exp:
2348            if exp.has_key('fedid'):
2349                key = exp['fedid']
2350                keytype = "fedid"
2351            elif exp.has_key('localname'):
2352                key = exp['localname']
2353                keytype = "localname"
2354            else:
2355                raise service_error(service_error.req, "Unknown lookup type")
2356        else:
2357            raise service_error(service_error.req, "No request?")
2358
2359        try:
2360            proof = self.check_experiment_access(fid, key)
2361        except service_error, e:
2362            self.log.info("Info call failed for %s: access denied" %  fid)
2363
2364
2365        self.update_info(key, fresh)
2366
2367        self.state_lock.acquire()
2368        if self.state.has_key(key):
2369            rv = self.state[key].get_info()
2370            # Copy the topo if we need legacy annotations
2371            if legacy:
2372                top = self.state[key].top
2373                if top is not None: top = top.clone()
2374        self.state_lock.release()
2375        self.log.info("Gathered Info for %s %s" % (key, fid))
2376
2377        # If the legacy visualization and topology representations are
2378        # requested, calculate them and add them to the return.
2379        if legacy and rv is not None:
2380            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2381            if top is not None:
2382                vtopo = topdl.topology_to_vtopo(top)
2383                if vtopo is not None:
2384                    rv['vtopo'] = vtopo
2385                    try:
2386                        vis = self.genviz(vtopo)
2387                    except service_error, e:
2388                        self.log.debug('Problem generating visualization: %s' \
2389                                % e)
2390                        vis = None
2391                    if vis is not None:
2392                        rv['vis'] = vis
2393        if rv:
2394            self.log.info("Info succeded for %s %s" % (key, fid))
2395            rv['proof'] = proof.to_dict()
2396            return rv
2397        else: 
2398            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2399            raise service_error(service_error.req, "No such experiment")
2400
2401    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2402            results):
2403        """
2404        Call OperateSegment on multiple testbeds and gather the results.
2405        op_params contains the parameters needed to contact that testbed, cert
2406        is a certificate containing the fedid to use, op is the operation,
2407        testbeds is a dict mapping testbed name to targets in that testbed,
2408        params are the parameters to include a,d results is a growing list of
2409        the results of the calls.
2410        """
2411        try:
2412            tmpdir = tempfile.mkdtemp(prefix="info-")
2413        except EnvironmentError:
2414            raise service_error(service_error.internal, 
2415                    "Cannot create tmp dir")
2416        cert_file = self.make_temp_certfile(cert, tmpdir)
2417
2418        try:
2419            for tb, targets in testbeds.items():
2420                if tb in op_params:
2421                    uri, aid = op_params[tb]
2422                    operate=self.operation_segment(log=self.log, testbed=uri,
2423                                cert_file=cert_file, cert_pwd=None,
2424                                trusted_certs=self.trusted_certs,
2425                                caller=self.call_OperationSegment)
2426                    if operate(uri, aid, op, targets, params):
2427                        if operate.status is not None:
2428                            results.extend(operate.status)
2429                            continue
2430                # Something went wrong in a weird way.  Add statuses
2431                # that reflect that to results
2432                for t in targets:
2433                    results.append(operation_status(t, 
2434                        operation_status.federant,
2435                        'Unexpected error on %s' % tb))
2436        # Clean up the tmpdir no matter what
2437        finally:
2438            if tmpdir: self.remove_dirs(tmpdir)
2439
2440    def do_operation(self, req, fid):
2441        """
2442        Find the testbeds holding each target and ask them to carry out the
2443        operation.  Return the statuses.
2444        """
2445        # Map an element to the testbed containing it
2446        def element_to_tb(e):
2447            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2448            elif isinstance(e, topdl.Testbed): return e.name
2449            else: return None
2450        # If d is an operation_status object, make it a dict
2451        def make_dict(d):
2452            if isinstance(d, dict): return d
2453            elif isinstance(d, operation_status): return d.to_dict()
2454            else: return { }
2455
2456        def element_name(e):
2457            if isinstance(e, topdl.Computer): return e.name
2458            elif isinstance(e, topdl.Testbed): 
2459                if e.localname: return e.localname[0]
2460                else: return None
2461            else: return None
2462
2463        self.log.info("Operation call started for %s" %  fid)
2464        req = req.get('OperationRequestBody', None)
2465        if not req:
2466            raise service_error(service_error.req,
2467                    "Bad request format (no OperationRequestBody)")
2468        exp = req.get('experiment', None)
2469        op = req.get('operation', None)
2470        targets = set(req.get('target', []))
2471        params = req.get('parameter', None)
2472
2473        if exp:
2474            if 'fedid' in exp:
2475                key = exp['fedid']
2476                keytype = "fedid"
2477            elif 'localname' in exp:
2478                key = exp['localname']
2479                keytype = "localname"
2480            else:
2481                raise service_error(service_error.req, "Unknown lookup type")
2482        else:
2483            raise service_error(service_error.req, "No request?")
2484
2485        if op is None or not targets:
2486            raise service_error(service_error.req, "No request?")
2487
2488        try:
2489            proof = self.check_experiment_access(fid, key)
2490        except service_error, e:
2491            self.log.info("Operation call failed for %s: access denied" %  fid)
2492            raise e
2493
2494        self.state_lock.acquire()
2495        if key in self.state:
2496            d1, op_params, cert, d2 = \
2497                    self.get_segment_info(self.state[key], need_lock=False,
2498                            key='tb')
2499            top = self.state[key].top
2500            if top is not None:
2501                top = top.clone()
2502        self.state_lock.release()
2503
2504        if top is None:
2505            self.log.info("Operation call failed for %s: not active" %  fid)
2506            raise service_error(service_error.partial, "No topology yet", 
2507                    proof=proof)
2508
2509        testbeds = { }
2510        results = []
2511        for e in top.elements:
2512            ename = element_name(e)
2513            if ename in targets:
2514                tb = element_to_tb(e)
2515                targets.remove(ename)
2516                if tb is not None:
2517                    if tb in testbeds: testbeds[tb].append(ename)
2518                    else: testbeds[tb] = [ ename ]
2519                else:
2520                    results.append(operation_status(e.name, 
2521                        code=operation_status.no_target, 
2522                        description='Cannot map target to testbed'))
2523
2524        for t in targets:
2525            results.append(operation_status(t, operation_status.no_target))
2526
2527        self.operate_on_segments(op_params, cert, op, testbeds, params,
2528                results)
2529
2530        self.log.info("Operation call succeeded for %s" %  fid)
2531        return { 
2532                'experiment': exp, 
2533                'status': [make_dict(r) for r in results],
2534                'proof': proof.to_dict()
2535                }
2536
2537
2538    def get_multi_info(self, req, fid):
2539        """
2540        Return all the stored info that this fedid can access
2541        """
2542        rv = { 'info': [ ], 'proof': [ ] }
2543
2544        self.log.info("Multi Info call started for %s" %  fid)
2545        self.state_lock.acquire()
2546        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2547            try:
2548                proof = self.check_experiment_access(fid, key)
2549            except service_error, e:
2550                if e.code == service_error.access:
2551                    continue
2552                else:
2553                    self.log.info("Multi Info call failed for %s: %s" %  \
2554                            (e,fid))
2555                    self.state_lock.release()
2556                    raise e
2557
2558            if self.state.has_key(key):
2559                e = self.state[key].get_info()
2560                e['proof'] = proof.to_dict()
2561                rv['info'].append(e)
2562                rv['proof'].append(proof.to_dict())
2563        self.state_lock.release()
2564        self.log.info("Multi Info call succeeded for %s" %  fid)
2565        return rv
2566
2567    def check_termination_status(self, fed_exp, force):
2568        """
2569        Confirm that the experiment is sin a valid state to stop (or force it)
2570        return the state - invalid states for deletion and force settings cause
2571        exceptions.
2572        """
2573        self.state_lock.acquire()
2574        status = fed_exp.status
2575
2576        if status:
2577            if status in ('starting', 'terminating'):
2578                if not force:
2579                    self.state_lock.release()
2580                    raise service_error(service_error.partial, 
2581                            'Experiment still being created or destroyed')
2582                else:
2583                    self.log.warning('Experiment in %s state ' % status + \
2584                            'being terminated by force.')
2585            self.state_lock.release()
2586            return status
2587        else:
2588            # No status??? trouble
2589            self.state_lock.release()
2590            raise service_error(service_error.internal,
2591                    "Experiment has no status!?")
2592
2593    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2594        ids = []
2595        term_params = { }
2596        if need_lock: self.state_lock.acquire()
2597        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2598        expcert = fed_exp.identity
2599        repo = "%s" % fed_exp.fedid
2600
2601        # Collect the allocation/segment ids into a dict keyed by the fedid
2602        # of the allocation that contains a tuple of uri, aid
2603        for i, fed in enumerate(fed_exp.get_all_allocations()):
2604            uri = fed.uri
2605            aid = fed.allocID
2606            if key == 'aid': term_params[aid] = (uri, aid)
2607            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2608
2609        if need_lock: self.state_lock.release()
2610        return ids, term_params, expcert, repo
2611
2612
2613    def get_termination_info(self, fed_exp):
2614        self.state_lock.acquire()
2615        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2616        # Change the experiment state
2617        fed_exp.status = 'terminating'
2618        fed_exp.updated()
2619        if self.state_filename: self.write_state()
2620        self.state_lock.release()
2621
2622        return ids, term_params, expcert, repo
2623
2624
2625    def deallocate_resources(self, term_params, expcert, status, force, 
2626            dealloc_log):
2627        tmpdir = None
2628        # This try block makes sure the tempdir is cleared
2629        try:
2630            # If no expcert, try the deallocation as the experiment
2631            # controller instance.
2632            if expcert and self.auth_type != 'legacy': 
2633                try:
2634                    tmpdir = tempfile.mkdtemp(prefix="term-")
2635                except EnvironmentError:
2636                    raise service_error(service_error.internal, 
2637                            "Cannot create tmp dir")
2638                cert_file = self.make_temp_certfile(expcert, tmpdir)
2639                pw = None
2640            else: 
2641                cert_file = self.cert_file
2642                pw = self.cert_pwd
2643
2644            # Stop everyone.  NB, wait_for_all waits until a thread starts
2645            # and then completes, so we can't wait if nothing starts.  So,
2646            # no tbparams, no start.
2647            if len(term_params) > 0:
2648                tp = thread_pool(self.nthreads)
2649                for k, (uri, aid) in term_params.items():
2650                    # Create and start a thread to stop the segment
2651                    tp.wait_for_slot()
2652                    t  = pooled_thread(\
2653                            target=self.terminate_segment(log=dealloc_log,
2654                                testbed=uri,
2655                                cert_file=cert_file, 
2656                                cert_pwd=pw,
2657                                trusted_certs=self.trusted_certs,
2658                                caller=self.call_TerminateSegment),
2659                            args=(uri, aid), name=k,
2660                            pdata=tp, trace_file=self.trace_file)
2661                    t.start()
2662                # Wait for completions
2663                tp.wait_for_all_done()
2664
2665            # release the allocations (failed experiments have done this
2666            # already, and starting experiments may be in odd states, so we
2667            # ignore errors releasing those allocations
2668            try: 
2669                for k, (uri, aid)  in term_params.items():
2670                    self.release_access(None, aid, uri=uri,
2671                            cert_file=cert_file, cert_pwd=pw)
2672            except service_error, e:
2673                if status != 'failed' and not force:
2674                    raise e
2675
2676        # Clean up the tmpdir no matter what
2677        finally:
2678            if tmpdir: self.remove_dirs(tmpdir)
2679
2680    def terminate_experiment(self, req, fid):
2681        """
2682        Swap this experiment out on the federants and delete the shared
2683        information
2684        """
2685        self.log.info("Terminate experiment call started for %s" % fid)
2686        tbparams = { }
2687        req = req.get('TerminateRequestBody', None)
2688        if not req:
2689            raise service_error(service_error.req,
2690                    "Bad request format (no TerminateRequestBody)")
2691
2692        key = self.get_experiment_key(req, 'experiment')
2693        try:
2694            proof = self.check_experiment_access(fid, key)
2695        except service_error, e:
2696            self.log.info(
2697                    "Terminate experiment call failed for %s: access denied" \
2698                            % fid)
2699            raise e
2700        exp = req.get('experiment', False)
2701        force = req.get('force', False)
2702
2703        dealloc_list = [ ]
2704
2705
2706        # Create a logger that logs to the dealloc_list as well as to the main
2707        # log file.
2708        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2709        dealloc_log.info("Terminating %s " %key)
2710        h = logging.StreamHandler(self.list_log(dealloc_list))
2711        # XXX: there should be a global one of these rather than repeating the
2712        # code.
2713        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2714                    '%d %b %y %H:%M:%S'))
2715        dealloc_log.addHandler(h)
2716
2717        self.state_lock.acquire()
2718        fed_exp = self.state.get(key, None)
2719        self.state_lock.release()
2720        repo = None
2721
2722        if fed_exp:
2723            status = self.check_termination_status(fed_exp, force)
2724            # get_termination_info updates the experiment state
2725            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2726            self.deallocate_resources(term_params, expcert, status, force, 
2727                    dealloc_log)
2728
2729            # Remove the terminated experiment
2730            self.state_lock.acquire()
2731            for id in ids:
2732                self.clear_experiment_authorization(id, need_state_lock=False)
2733                if id in self.state: del self.state[id]
2734
2735            if self.state_filename: self.write_state()
2736            self.state_lock.release()
2737
2738            # Delete any synch points associated with this experiment.  All
2739            # synch points begin with the fedid of the experiment.
2740            fedid_keys = set(["fedid:%s" % f for f in ids \
2741                    if isinstance(f, fedid)])
2742            for k in self.synch_store.all_keys():
2743                try:
2744                    if len(k) > 45 and k[0:46] in fedid_keys:
2745                        self.synch_store.del_value(k)
2746                except synch_store.BadDeletionError:
2747                    pass
2748            self.write_store()
2749
2750            # Remove software and other cached stuff from the filesystem.
2751            if repo:
2752                self.remove_dirs("%s/%s" % (self.repodir, repo))
2753       
2754            self.log.info("Terminate experiment succeeded for %s %s" % \
2755                    (key, fid))
2756            return { 
2757                    'experiment': exp , 
2758                    'deallocationLog': string.join(dealloc_list, ''),
2759                    'proof': [proof.to_dict()],
2760                    }
2761        else:
2762            self.log.info("Terminate experiment failed for %s %s: no state" % \
2763                    (key, fid))
2764            raise service_error(service_error.req, "No saved state")
2765
2766
2767    def GetValue(self, req, fid):
2768        """
2769        Get a value from the synchronized store
2770        """
2771        req = req.get('GetValueRequestBody', None)
2772        if not req:
2773            raise service_error(service_error.req,
2774                    "Bad request format (no GetValueRequestBody)")
2775       
2776        name = req.get('name', None)
2777        wait = req.get('wait', False)
2778        rv = { 'name': name }
2779
2780        if not name:
2781            raise service_error(service_error.req, "No name?")
2782
2783        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2784
2785        if access_ok:
2786            self.log.debug("[GetValue] asking for %s " % name)
2787            try:
2788                v = self.synch_store.get_value(name, wait)
2789            except synch_store.RevokedKeyError:
2790                # No more synch on this key
2791                raise service_error(service_error.federant, 
2792                        "Synch key %s revoked" % name)
2793            if v is not None:
2794                rv['value'] = v
2795            rv['proof'] = proof.to_dict()
2796            self.log.debug("[GetValue] got %s from %s" % (v, name))
2797            return rv
2798        else:
2799            raise service_error(service_error.access, "Access Denied",
2800                    proof=proof)
2801       
2802
2803    def SetValue(self, req, fid):
2804        """
2805        Set a value in the synchronized store
2806        """
2807        req = req.get('SetValueRequestBody', None)
2808        if not req:
2809            raise service_error(service_error.req,
2810                    "Bad request format (no SetValueRequestBody)")
2811       
2812        name = req.get('name', None)
2813        v = req.get('value', '')
2814
2815        if not name:
2816            raise service_error(service_error.req, "No name?")
2817
2818        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2819
2820        if access_ok:
2821            try:
2822                self.synch_store.set_value(name, v)
2823                self.write_store()
2824                self.log.debug("[SetValue] set %s to %s" % (name, v))
2825            except synch_store.CollisionError:
2826                # Translate into a service_error
2827                raise service_error(service_error.req,
2828                        "Value already set: %s" %name)
2829            except synch_store.RevokedKeyError:
2830                # No more synch on this key
2831                raise service_error(service_error.federant, 
2832                        "Synch key %s revoked" % name)
2833                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2834        else:
2835            raise service_error(service_error.access, "Access Denied",
2836                    proof=proof)
Note: See TracBrowser for help on using the repository browser.