source: fedd/federation/experiment_control.py @ f671ef7

compt_changesinfo-ops
Last change on this file since f671ef7 was f671ef7, checked in by Ted Faber <faber@…>, 12 years ago

Merge new info more reasonably

  • Property mode set to 100644
File size: 92.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39from operation_status import operation_status
40
41import topdl
42import list_log
43from ip_allocator import ip_allocator
44from ip_addr import ip_addr
45
46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
53class experiment_control_local(experiment_control_legacy):
54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
60
61    class ssh_cmd_timeout(RuntimeError): pass
62   
63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
65    call_StartSegment = service_caller('StartSegment')
66    call_TerminateSegment = service_caller('TerminateSegment')
67    call_InfoSegment = service_caller('InfoSegment')
68    call_OperationSegment = service_caller('OperationSegment')
69    call_Ns2Topdl = service_caller('Ns2Topdl')
70
71    def __init__(self, config=None, auth=None):
72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
90        self.list_log = list_log.list_log
91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
102        self.repodir = config.get("experiment_control", "repodir")
103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
105
106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
110        self.nthreads = 10
111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131        accessdb_file = config.get("experiment_control", "accessdb")
132
133        dt = config.get("experiment_control", "direct_transit")
134        self.auth_type = config.get('experiment_control', 'auth_type') \
135                or 'legacy'
136        self.auth_dir = config.get('experiment_control', 'auth_dir')
137        # XXX: document this!
138        self.info_cache_limit = \
139                config.getint('experiment_control', 'info_cache', 600)
140        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
141        else: self.direct_transit = [ ]
142        # NB for internal master/slave ops, not experiment setup
143        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
144
145        self.overrides = set([])
146        ovr = config.get('experiment_control', 'overrides')
147        if ovr:
148            for o in ovr.split(","):
149                o = o.strip()
150                if o.startswith('fedid:'): o = o[len('fedid:'):]
151                self.overrides.add(fedid(hexstr=o))
152
153        self.state = { }
154        self.state_lock = Lock()
155        self.tclsh = "/usr/local/bin/otclsh"
156        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
157                config.get("experiment_control", "tcl_splitter",
158                        "/usr/testbed/lib/ns2ir/parse.tcl")
159        mapdb_file = config.get("experiment_control", "mapdb")
160        self.trace_file = sys.stderr
161
162        self.def_expstart = \
163                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
164                "/tmp/federate";
165        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
166                "FEDDIR/hosts";
167        self.def_gwstart = \
168                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
169                "/tmp/bridge.log";
170        self.def_mgwstart = \
171                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
172                "/tmp/bridge.log";
173        self.def_gwimage = "FBSD61-TUNNEL2";
174        self.def_gwtype = "pc";
175        self.local_access = { }
176
177        if self.auth_type == 'legacy':
178            if auth:
179                self.auth = auth
180            else:
181                self.log.error( "[access]: No authorizer initialized, " +\
182                        "creating local one.")
183                auth = authorizer()
184            self.get_access = self.legacy_get_access
185        elif self.auth_type == 'abac':
186            self.auth = abac_authorizer(load=self.auth_dir)
187        else:
188            raise service_error(service_error.internal, 
189                    "Unknown auth_type: %s" % self.auth_type)
190
191        if mapdb_file:
192            self.read_mapdb(mapdb_file)
193        else:
194            self.log.warn("[experiment_control] No testbed map, using defaults")
195            self.tbmap = { 
196                    'deter':'https://users.isi.deterlab.net:23235',
197                    'emulab':'https://users.isi.deterlab.net:23236',
198                    'ucb':'https://users.isi.deterlab.net:23237',
199                    }
200
201        if accessdb_file:
202                self.read_accessdb(accessdb_file)
203        else:
204            raise service_error(service_error.internal,
205                    "No accessdb specified in config")
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
219        # Dispatch tables
220        self.soap_services = {\
221                'New': soap_handler('New', self.new_experiment),
222                'Create': soap_handler('Create', self.create_experiment),
223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
227                'Operation': soap_handler('Operation', self.do_operation),
228                'Terminate': soap_handler('Terminate', 
229                    self.terminate_experiment),
230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
232        }
233
234        self.xmlrpc_services = {\
235                'New': xmlrpc_handler('New', self.new_experiment),
236                'Create': xmlrpc_handler('Create', self.create_experiment),
237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
241                'Terminate': xmlrpc_handler('Terminate',
242                    self.terminate_experiment),
243                'Operation': xmlrpc_handler('Operation', self.do_operation),
244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
246        }
247
248    # Call while holding self.state_lock
249    def write_state(self):
250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
262        except EnvironmentError, e:
263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
269
270    @staticmethod
271    def get_alloc_ids(exp):
272        """
273        Used by read_store and read state.  This used to be worse.
274        """
275
276        return [ a.allocID for a in exp.get_all_allocations() ]
277
278
279    # Call while holding self.state_lock
280    def read_state(self):
281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
286       
287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
292        except EnvironmentError, e:
293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
299        for s in self.state.values():
300            try:
301
302                eid = s.fedid
303                if eid : 
304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
307                        #self.auth.set_attribute(s['owner'], eid)
308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
317                else:
318                    raise KeyError("No experiment id")
319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
322
323
324    def read_accessdb(self, accessdb_file):
325        """
326        Read the mapping from fedids that can create experiments to their name
327        in the 3-level access namespace.  All will be asserted from this
328        testbed and can include the local username and porject that will be
329        asserted on their behalf by this fedd.  Each fedid is also added to the
330        authorization system with the "create" attribute.
331        """
332        self.accessdb = {}
333        # These are the regexps for parsing the db
334        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
335        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
336                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
337        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
338                "\s*->\s*(" + name_expr + ")\s*$")
339        lineno = 0
340
341        # Parse the mappings and store in self.authdb, a dict of
342        # fedid -> (proj, user)
343        try:
344            f = open(accessdb_file, "r")
345            for line in f:
346                lineno += 1
347                line = line.strip()
348                if len(line) == 0 or line.startswith('#'):
349                    continue
350                m = project_line.match(line)
351                if m:
352                    fid = fedid(hexstr=m.group(1))
353                    project, user = m.group(2,3)
354                    if not self.accessdb.has_key(fid):
355                        self.accessdb[fid] = []
356                    self.accessdb[fid].append((project, user))
357                    continue
358
359                m = user_line.match(line)
360                if m:
361                    fid = fedid(hexstr=m.group(1))
362                    project = None
363                    user = m.group(2)
364                    if not self.accessdb.has_key(fid):
365                        self.accessdb[fid] = []
366                    self.accessdb[fid].append((project, user))
367                    continue
368                self.log.warn("[experiment_control] Error parsing access " +\
369                        "db %s at line %d" %  (accessdb_file, lineno))
370        except EnvironmentError:
371            raise service_error(service_error.internal,
372                    ("Error opening/reading %s as experiment " +\
373                            "control accessdb") %  accessdb_file)
374        f.close()
375
376        # Initialize the authorization attributes
377        # XXX: legacy
378        if self.auth_type == 'legacy':
379            for fid in self.accessdb.keys():
380                self.auth.set_attribute(fid, 'create')
381                self.auth.set_attribute(fid, 'new')
382
383    def read_mapdb(self, file):
384        """
385        Read a simple colon separated list of mappings for the
386        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
387        """
388
389        self.tbmap = { }
390        lineno =0
391        try:
392            f = open(file, "r")
393            for line in f:
394                lineno += 1
395                line = line.strip()
396                if line.startswith('#') or len(line) == 0:
397                    continue
398                try:
399                    label, url = line.split(':', 1)
400                    self.tbmap[label] = url
401                except ValueError, e:
402                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
403                            "map db: %s %s" % (lineno, line, e))
404        except EnvironmentError, e:
405            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
406                    "open %s: %s" % (file, e))
407        else:
408            f.close()
409
410    def read_store(self):
411        try:
412            self.synch_store = synch_store()
413            self.synch_store.load(self.store_filename)
414            self.log.debug("[read_store]: Read store from %s" % \
415                    self.store_filename)
416        except EnvironmentError, e:
417            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
418                    % (self.state_filename, e))
419            self.synch_store = synch_store()
420
421        # Set the initial permissions on data in the store.  XXX: This ad hoc
422        # authorization attribute initialization is getting out of hand.
423        # XXX: legacy
424        if self.auth_type == 'legacy':
425            for k in self.synch_store.all_keys():
426                try:
427                    if k.startswith('fedid:'):
428                        fid = fedid(hexstr=k[6:46])
429                        if self.state.has_key(fid):
430                            for a in self.get_alloc_ids(self.state[fid]):
431                                self.auth.set_attribute(a, k)
432                except ValueError, e:
433                    self.log.warn("Cannot deduce permissions for %s" % k)
434
435
436    def write_store(self):
437        """
438        Write a new copy of synch_store after writing current state
439        to a backup.  We use the internal synch_store pickle method to avoid
440        incinsistent data.
441
442        State format is a simple pickling of the store.
443        """
444        if os.access(self.store_filename, os.W_OK):
445            copy_file(self.store_filename, \
446                    "%s.bak" % self.store_filename)
447        try:
448            self.synch_store.save(self.store_filename)
449        except EnvironmentError, e:
450            self.log.error("Can't write file %s: %s" % \
451                    (self.store_filename, e))
452        except TypeError, e:
453            self.log.error("Pickling problem (TypeError): %s" % e)
454
455
456    def remove_dirs(self, dir):
457        """
458        Remove the directory tree and all files rooted at dir.  Log any errors,
459        but continue.
460        """
461        self.log.debug("[removedirs]: removing %s" % dir)
462        try:
463            for path, dirs, files in os.walk(dir, topdown=False):
464                for f in files:
465                    os.remove(os.path.join(path, f))
466                for d in dirs:
467                    os.rmdir(os.path.join(path, d))
468            os.rmdir(dir)
469        except EnvironmentError, e:
470            self.log.error("Error deleting directory tree in %s" % e);
471
472    @staticmethod
473    def make_temp_certfile(expcert, tmpdir):
474        """
475        make a protected copy of the access certificate so the experiment
476        controller can act as the experiment principal.  mkstemp is the most
477        secure way to do that. The directory should be created by
478        mkdtemp.  Return the filename.
479        """
480        if expcert and tmpdir:
481            try:
482                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
483                f = os.fdopen(certf, 'w')
484                print >> f, expcert
485                f.close()
486            except EnvironmentError, e:
487                raise service_error(service_error.internal, 
488                        "Cannot create temp cert file?")
489            return certfn
490        else:
491            return None
492
493       
494    def generate_ssh_keys(self, dest, type="rsa" ):
495        """
496        Generate a set of keys for the gateways to use to talk.
497
498        Keys are of type type and are stored in the required dest file.
499        """
500        valid_types = ("rsa", "dsa")
501        t = type.lower();
502        if t not in valid_types: raise ValueError
503        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
504
505        try:
506            trace = open("/dev/null", "w")
507        except EnvironmentError:
508            raise service_error(service_error.internal,
509                    "Cannot open /dev/null??");
510
511        # May raise CalledProcessError
512        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
513        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
514        if rv != 0:
515            raise service_error(service_error.internal, 
516                    "Cannot generate nonce ssh keys.  %s return code %d" \
517                            % (self.ssh_keygen, rv))
518
519    def generate_seer_certs(self, destdir):
520        '''
521        Create a SEER ca cert and a node cert in destdir/ca.pem and
522        destdir/node.pem respectively.  These will be distributed throughout
523        the federated experiment.  This routine reports errors via
524        service_errors.
525        '''
526        openssl = '/usr/bin/openssl'
527        # All the filenames and parameters we need for openssl calls below
528        ca_key =os.path.join(destdir, 'ca.key') 
529        ca_pem = os.path.join(destdir, 'ca.pem')
530        node_key =os.path.join(destdir, 'node.key') 
531        node_pem = os.path.join(destdir, 'node.pem')
532        node_req = os.path.join(destdir, 'node.req')
533        node_signed = os.path.join(destdir, 'node.signed')
534        days = '%s' % (365 * 10)
535        serial = '%s' % random.randint(0, 1<<16)
536
537        try:
538            # Sequence of calls to create a CA key, create a ca cert, create a
539            # node key, node signing request, and finally a signed node
540            # certificate.
541            sequence = (
542                    (openssl, 'genrsa', '-out', ca_key, '1024'),
543                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
544                        ca_pem, '-days', days, '-subj', 
545                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
546                    (openssl, 'genrsa', '-out', node_key, '1024'),
547                    (openssl, 'req', '-new', '-key', node_key, '-out', 
548                        node_req, '-days', days, '-subj', 
549                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
550                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
551                        '-set_serial', serial, '-req', '-in', node_req, 
552                        '-out', node_signed, '-days', days),
553                )
554            # Do all that stuff; bail if there's an error, and push all the
555            # output to dev/null.
556            for cmd in sequence:
557                trace = open("/dev/null", "w")
558                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
559                if rv != 0:
560                    raise service_error(service_error.internal, 
561                            "Cannot generate SEER certs.  %s return code %d" \
562                                    % (' '.join(cmd), rv))
563            # Concatinate the node key and signed certificate into node.pem
564            f = open(node_pem, 'w')
565            for comp in (node_signed, node_key):
566                g = open(comp, 'r')
567                f.write(g.read())
568                g.close()
569            f.close()
570
571            # Throw out intermediaries.
572            for fn in (ca_key, node_key, node_req, node_signed):
573                os.unlink(fn)
574
575        except EnvironmentError, e:
576            # Any difficulties with the file system wind up here
577            raise service_error(service_error.internal,
578                    "File error on  %s while creating SEER certs: %s" % \
579                            (e.filename, e.strerror))
580
581
582
583    def gentopo(self, str):
584        """
585        Generate the topology data structure from the splitter's XML
586        representation of it.
587
588        The topology XML looks like:
589            <experiment>
590                <nodes>
591                    <node><vname></vname><ips>ip1:ip2</ips></node>
592                </nodes>
593                <lans>
594                    <lan>
595                        <vname></vname><vnode></vnode><ip></ip>
596                        <bandwidth></bandwidth><member>node:port</member>
597                    </lan>
598                </lans>
599        """
600        class topo_parse:
601            """
602            Parse the topology XML and create the dats structure.
603            """
604            def __init__(self):
605                # Typing of the subelements for data conversion
606                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
607                self.int_subelements = ( 'bandwidth',)
608                self.float_subelements = ( 'delay',)
609                # The final data structure
610                self.nodes = [ ]
611                self.lans =  [ ]
612                self.topo = { \
613                        'node': self.nodes,\
614                        'lan' : self.lans,\
615                    }
616                self.element = { }  # Current element being created
617                self.chars = ""     # Last text seen
618
619            def end_element(self, name):
620                # After each sub element the contents is added to the current
621                # element or to the appropriate list.
622                if name == 'node':
623                    self.nodes.append(self.element)
624                    self.element = { }
625                elif name == 'lan':
626                    self.lans.append(self.element)
627                    self.element = { }
628                elif name in self.str_subelements:
629                    self.element[name] = self.chars
630                    self.chars = ""
631                elif name in self.int_subelements:
632                    self.element[name] = int(self.chars)
633                    self.chars = ""
634                elif name in self.float_subelements:
635                    self.element[name] = float(self.chars)
636                    self.chars = ""
637
638            def found_chars(self, data):
639                self.chars += data.rstrip()
640
641
642        tp = topo_parse();
643        parser = xml.parsers.expat.ParserCreate()
644        parser.EndElementHandler = tp.end_element
645        parser.CharacterDataHandler = tp.found_chars
646
647        parser.Parse(str)
648
649        return tp.topo
650       
651
652    def genviz(self, topo):
653        """
654        Generate the visualization the virtual topology
655        """
656
657        neato = "/usr/local/bin/neato"
658        # These are used to parse neato output and to create the visualization
659        # file.
660        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
661        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
662                "%s</type></node>"
663
664        try:
665            # Node names
666            nodes = [ n['vname'] for n in topo['node'] ]
667            topo_lans = topo['lan']
668        except KeyError, e:
669            raise service_error(service_error.internal, "Bad topology: %s" %e)
670
671        lans = { }
672        links = { }
673
674        # Walk through the virtual topology, organizing the connections into
675        # 2-node connections (links) and more-than-2-node connections (lans).
676        # When a lan is created, it's added to the list of nodes (there's a
677        # node in the visualization for the lan).
678        for l in topo_lans:
679            if links.has_key(l['vname']):
680                if len(links[l['vname']]) < 2:
681                    links[l['vname']].append(l['vnode'])
682                else:
683                    nodes.append(l['vname'])
684                    lans[l['vname']] = links[l['vname']]
685                    del links[l['vname']]
686                    lans[l['vname']].append(l['vnode'])
687            elif lans.has_key(l['vname']):
688                lans[l['vname']].append(l['vnode'])
689            else:
690                links[l['vname']] = [ l['vnode'] ]
691
692
693        # Open up a temporary file for dot to turn into a visualization
694        try:
695            df, dotname = tempfile.mkstemp()
696            dotfile = os.fdopen(df, 'w')
697        except EnvironmentError:
698            raise service_error(service_error.internal,
699                    "Failed to open file in genviz")
700
701        try:
702            dnull = open('/dev/null', 'w')
703        except EnvironmentError:
704            service_error(service_error.internal,
705                    "Failed to open /dev/null in genviz")
706
707        # Generate a dot/neato input file from the links, nodes and lans
708        try:
709            print >>dotfile, "graph G {"
710            for n in nodes:
711                print >>dotfile, '\t"%s"' % n
712            for l in links.keys():
713                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
714            for l in lans.keys():
715                for n in lans[l]:
716                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
717            print >>dotfile, "}"
718            dotfile.close()
719        except TypeError:
720            raise service_error(service_error.internal,
721                    "Single endpoint link in vtopo")
722        except EnvironmentError:
723            raise service_error(service_error.internal, "Cannot write dot file")
724
725        # Use dot to create a visualization
726        try:
727            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
728                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
729                stderr=dnull, close_fds=True)
730        except EnvironmentError:
731            raise service_error(service_error.internal, 
732                    "Cannot generate visualization: is graphviz available?")
733        dnull.close()
734
735        # Translate dot to vis format
736        vis_nodes = [ ]
737        vis = { 'node': vis_nodes }
738        for line in dot.stdout:
739            m = vis_re.match(line)
740            if m:
741                vn = m.group(1)
742                vis_node = {'name': vn, \
743                        'x': float(m.group(2)),\
744                        'y' : float(m.group(3)),\
745                    }
746                if vn in links.keys() or vn in lans.keys():
747                    vis_node['type'] = 'lan'
748                else:
749                    vis_node['type'] = 'node'
750                vis_nodes.append(vis_node)
751        rv = dot.wait()
752
753        os.remove(dotname)
754        # XXX: graphviz seems to use low return codes for warnings, like
755        # "couldn't find font"
756        if rv < 2 : return vis
757        else: return None
758
759
760    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
761            cert_pwd=None):
762        """
763        Release access to testbed through fedd
764        """
765
766        if not uri and tbmap:
767            uri = tbmap.get(tb, None)
768        if not uri:
769            raise service_error(service_error.server_config, 
770                    "Unknown testbed: %s" % tb)
771
772        if self.local_access.has_key(uri):
773            resp = self.local_access[uri].ReleaseAccess(\
774                    { 'ReleaseAccessRequestBody' : 
775                        {'allocID': {'fedid': aid}},}, 
776                    fedid(file=cert_file))
777            resp = { 'ReleaseAccessResponseBody': resp } 
778        else:
779            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
780                    cert_file, cert_pwd, self.trusted_certs)
781
782        # better error coding
783
784    def remote_ns2topdl(self, uri, desc):
785
786        req = {
787                'description' : { 'ns2description': desc },
788            }
789
790        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
791                self.trusted_certs)
792
793        if r.has_key('Ns2TopdlResponseBody'):
794            r = r['Ns2TopdlResponseBody']
795            ed = r.get('experimentdescription', None)
796            if ed.has_key('topdldescription'):
797                return topdl.Topology(**ed['topdldescription'])
798            else:
799                raise service_error(service_error.protocol, 
800                        "Bad splitter response (no output)")
801        else:
802            raise service_error(service_error.protocol, "Bad splitter response")
803
804    class start_segment:
805        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
806                cert_pwd=None, trusted_certs=None, caller=None,
807                log_collector=None):
808            self.log = log
809            self.debug = debug
810            self.cert_file = cert_file
811            self.cert_pwd = cert_pwd
812            self.trusted_certs = None
813            self.caller = caller
814            self.testbed = testbed
815            self.log_collector = log_collector
816            self.response = None
817            self.node = { }
818            self.proof = None
819
820        def make_map(self, resp):
821            if 'segmentdescription' not in resp  or \
822                    'topdldescription' not in resp['segmentdescription']:
823                self.log.warn('No topology returned from startsegment')
824                return 
825
826            top = topdl.Topology(
827                    **resp['segmentdescription']['topdldescription'])
828
829            for e in top.elements:
830                if isinstance(e, topdl.Computer):
831                    self.node[e.name] = e
832
833        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
834            req = {
835                    'allocID': { 'fedid' : aid }, 
836                    'segmentdescription': { 
837                        'topdldescription': topo.to_dict(),
838                    },
839                }
840
841            if connInfo:
842                req['connection'] = connInfo
843
844            import_svcs = [ s for m in masters.values() \
845                    for s in m if self.testbed in s.importers]
846
847            if import_svcs or self.testbed in masters:
848                req['service'] = []
849
850            for s in import_svcs:
851                for r in s.reqs:
852                    sr = copy.deepcopy(r)
853                    sr['visibility'] = 'import';
854                    req['service'].append(sr)
855
856            for s in masters.get(self.testbed, []):
857                for r in s.reqs:
858                    sr = copy.deepcopy(r)
859                    sr['visibility'] = 'export';
860                    req['service'].append(sr)
861
862            if attrs:
863                req['fedAttr'] = attrs
864
865            try:
866                self.log.debug("Calling StartSegment at %s " % uri)
867                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
868                        self.trusted_certs)
869                if r.has_key('StartSegmentResponseBody'):
870                    lval = r['StartSegmentResponseBody'].get('allocationLog',
871                            None)
872                    if lval and self.log_collector:
873                        for line in  lval.splitlines(True):
874                            self.log_collector.write(line)
875                    self.make_map(r['StartSegmentResponseBody'])
876                    if 'proof' in r: self.proof = r['proof']
877                    self.response = r
878                else:
879                    raise service_error(service_error.internal, 
880                            "Bad response!?: %s" %r)
881                return True
882            except service_error, e:
883                self.log.error("Start segment failed on %s: %s" % \
884                        (self.testbed, e))
885                return False
886
887
888
889    class terminate_segment:
890        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
891                cert_pwd=None, trusted_certs=None, caller=None):
892            self.log = log
893            self.debug = debug
894            self.cert_file = cert_file
895            self.cert_pwd = cert_pwd
896            self.trusted_certs = None
897            self.caller = caller
898            self.testbed = testbed
899
900        def __call__(self, uri, aid ):
901            req = {
902                    'allocID': {'fedid': aid }, 
903                }
904            try:
905                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
906                        self.trusted_certs)
907                return True
908            except service_error, e:
909                self.log.error("Terminate segment failed on %s: %s" % \
910                        (self.testbed, e))
911                return False
912
913    class info_segment(start_segment):
914        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
915                cert_pwd=None, trusted_certs=None, caller=None,
916                log_collector=None):
917            experiment_control_local.start_segment.__init__(self, debug, 
918                    log, testbed, cert_file, cert_pwd, trusted_certs, 
919                    caller, log_collector)
920
921        def __call__(self, uri, aid):
922            req = { 'allocID': { 'fedid' : aid } }
923
924            try:
925                self.log.debug("Calling InfoSegment at %s " % uri)
926                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
927                        self.trusted_certs)
928                if r.has_key('InfoSegmentResponseBody'):
929                    self.make_map(r['InfoSegmentResponseBody'])
930                    if 'proof' in r: self.proof = r['proof']
931                    self.response = r
932                else:
933                    raise service_error(service_error.internal, 
934                            "Bad response!?: %s" %r)
935                return True
936            except service_error, e:
937                self.log.error("Info segment failed on %s: %s" % \
938                        (self.testbed, e))
939                return False
940
941    class operation_segment:
942        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
943                cert_pwd=None, trusted_certs=None, caller=None,
944                log_collector=None):
945            self.log = log
946            self.debug = debug
947            self.cert_file = cert_file
948            self.cert_pwd = cert_pwd
949            self.trusted_certs = None
950            self.caller = caller
951            self.testbed = testbed
952            self.status = None
953
954        def __call__(self, uri, aid, op, targets, params):
955            req = { 
956                    'allocID': { 'fedid' : aid },
957                    'operation': op,
958                    'target': targets,
959                    }
960            if params: req['parameter'] = params
961
962
963            try:
964                self.log.debug("Calling OperationSegment at %s " % uri)
965                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
966                        self.trusted_certs)
967                if 'OperationSegmentResponseBody' in r:
968                    r = r['OperationSegmentResponseBody']
969                    if 'status' in r:
970                        self.status = r['status']
971                else:
972                    raise service_error(service_error.internal, 
973                            "Bad response!?: %s" %r)
974                return True
975            except service_error, e:
976                self.log.error("Operation segment failed on %s: %s" % \
977                        (self.testbed, e))
978                return False
979
980    def annotate_topology(self, top, data):
981        # These routines do various parts of the annotation
982        def add_new_names(nl, l):
983            """ add any names in nl to the list in l """
984            for n in nl:
985                if n not in l: l.append(n)
986       
987        def merge_services(ne, e):
988            for ns in ne.service:
989                # NB: the else is on the for
990                for s in e.service:
991                    if ns.name == s.name:
992                        s.importer = ns.importer
993                        s.param = ns.param
994                        s.description = ns.description
995                        s.status = ns.status
996                        break
997                else:
998                    e.service.append(ns)
999       
1000        def merge_oses(ne, e):
1001            """
1002            Merge the operating system entries of ne into e
1003            """
1004            for nos in ne.os:
1005                # NB: the else is on the for
1006                for os in e.os:
1007                    if nos.name == eos.name:
1008                        eos.version = nos.version
1009                        eos.version = nos.distribution
1010                        eos.version = nos.distributionversion
1011                        for a in nos.attribute:
1012                            eos.set_attribute(a.attribute, a.value)
1013                        break
1014                else:
1015                    e.os.append(nos)
1016
1017        # Annotate the topology with embedding info
1018        for e in top.elements:
1019            if isinstance(e, topdl.Computer):
1020                for s in data:
1021                    ne = s.node.get(e.name, None)
1022                    if ne is not None:
1023                        add_new_names(ne.localname, e.localname)
1024                        e.status = ne.status
1025                        merge_services(ne, e)
1026                        add_new_names(ne.operation, e.operation)
1027                        if ne.os: merge_oses(ne, e)
1028                        break
1029
1030
1031
1032    def allocate_resources(self, allocated, masters, eid, expid, 
1033            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1034            attrs=None, connInfo={}, tbmap=None, expcert=None):
1035
1036        started = { }           # Testbeds where a sub-experiment started
1037                                # successfully
1038
1039        # XXX
1040        fail_soft = False
1041
1042        if tbmap is None: tbmap = { }
1043
1044        log = alloc_log or self.log
1045
1046        tp = thread_pool(self.nthreads)
1047        threads = [ ]
1048        starters = [ ]
1049
1050        if expcert:
1051            cert = expcert
1052            pw = None
1053        else:
1054            cert = self.cert_file
1055            pw = self.cert_pwd
1056
1057        for tb in allocated.keys():
1058            # Create and start a thread to start the segment, and save it
1059            # to get the return value later
1060            tb_attrs = copy.copy(attrs)
1061            tp.wait_for_slot()
1062            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1063            base, suffix = split_testbed(tb)
1064            if suffix:
1065                tb_attrs.append({'attribute': 'experiment_name', 
1066                    'value': "%s-%s" % (eid, suffix)})
1067            else:
1068                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1069            if not uri:
1070                raise service_error(service_error.internal, 
1071                        "Unknown testbed %s !?" % tb)
1072
1073            aid = tbparams[tb].allocID
1074            if not aid:
1075                raise service_error(service_error.internal, 
1076                        "No alloc id for testbed %s !?" % tb)
1077
1078            s = self.start_segment(log=log, debug=self.debug,
1079                    testbed=tb, cert_file=cert,
1080                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1081                    caller=self.call_StartSegment,
1082                    log_collector=log_collector)
1083            starters.append(s)
1084            t  = pooled_thread(\
1085                    target=s, name=tb,
1086                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1087                    pdata=tp, trace_file=self.trace_file)
1088            threads.append(t)
1089            t.start()
1090
1091        # Wait until all finish (keep pinging the log, though)
1092        mins = 0
1093        revoked = False
1094        while not tp.wait_for_all_done(60.0):
1095            mins += 1
1096            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1097                    % mins)
1098            if not revoked and \
1099                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1100                # a testbed has failed.  Revoke this experiment's
1101                # synchronizarion values so that sub experiments will not
1102                # deadlock waiting for synchronization that will never happen
1103                self.log.info("A subexperiment has failed to swap in, " + \
1104                        "revoking synch keys")
1105                var_key = "fedid:%s" % expid
1106                for k in self.synch_store.all_keys():
1107                    if len(k) > 45 and k[0:46] == var_key:
1108                        self.synch_store.revoke_key(k)
1109                revoked = True
1110
1111        failed = [ t.getName() for t in threads if not t.rv ]
1112        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1113
1114        # If one failed clean up, unless fail_soft is set
1115        if failed:
1116            if not fail_soft:
1117                tp.clear()
1118                for tb in succeeded:
1119                    # Create and start a thread to stop the segment
1120                    tp.wait_for_slot()
1121                    uri = tbparams[tb].uri
1122                    t  = pooled_thread(\
1123                            target=self.terminate_segment(log=log,
1124                                testbed=tb,
1125                                cert_file=cert, 
1126                                cert_pwd=pw,
1127                                trusted_certs=self.trusted_certs,
1128                                caller=self.call_TerminateSegment),
1129                            args=(uri, tbparams[tb].allocID),
1130                            name=tb,
1131                            pdata=tp, trace_file=self.trace_file)
1132                    t.start()
1133                # Wait until all finish (if any are being stopped)
1134                if succeeded:
1135                    tp.wait_for_all_done()
1136
1137                # release the allocations
1138                for tb in tbparams.keys():
1139                    try:
1140                        self.release_access(tb, tbparams[tb].allocID, 
1141                                tbmap=tbmap, uri=tbparams[tb].uri,
1142                                cert_file=cert, cert_pwd=pw)
1143                    except service_error, e:
1144                        self.log.warn("Error releasing access: %s" % e.desc)
1145                # Remove the placeholder
1146                self.state_lock.acquire()
1147                self.state[eid].status = 'failed'
1148                self.state[eid].updated()
1149                if self.state_filename: self.write_state()
1150                self.state_lock.release()
1151                # Remove the repo dir
1152                self.remove_dirs("%s/%s" %(self.repodir, expid))
1153                # Walk up tmpdir, deleting as we go
1154                if self.cleanup:
1155                    self.remove_dirs(tmpdir)
1156                else:
1157                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1158
1159
1160                log.error("Swap in failed on %s" % ",".join(failed))
1161                return
1162        else:
1163            # Walk through the successes and gather the proofs
1164            proofs = { }
1165            for s in starters:
1166                if s.proof:
1167                    proofs[s.testbed] = s.proof
1168            self.annotate_topology(top, starters)
1169            log.info("[start_segment]: Experiment %s active" % eid)
1170
1171
1172        # Walk up tmpdir, deleting as we go
1173        if self.cleanup:
1174            self.remove_dirs(tmpdir)
1175        else:
1176            log.debug("[start_experiment]: not removing %s" % tmpdir)
1177
1178        # Insert the experiment into our state and update the disk copy.
1179        self.state_lock.acquire()
1180        self.state[expid].status = 'active'
1181        self.state[eid] = self.state[expid]
1182        self.state[eid].top = top
1183        self.state[eid].updated()
1184        # Append startup proofs
1185        for f in self.state[eid].get_all_allocations():
1186            if f.tb in proofs:
1187                f.proof.append(proofs[f.tb])
1188
1189        if self.state_filename: self.write_state()
1190        self.state_lock.release()
1191        return
1192
1193
1194    def add_kit(self, e, kit):
1195        """
1196        Add a Software object created from the list of (install, location)
1197        tuples passed as kit  to the software attribute of an object e.  We
1198        do this enough to break out the code, but it's kind of a hack to
1199        avoid changing the old tuple rep.
1200        """
1201
1202        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1203
1204        if isinstance(e.software, list): e.software.extend(s)
1205        else: e.software = s
1206
1207    def append_experiment_authorization(self, expid, attrs, 
1208            need_state_lock=True):
1209        """
1210        Append the authorization information to system state
1211        """
1212
1213        for p, a in attrs:
1214            self.auth.set_attribute(p, a)
1215        self.auth.save()
1216
1217        if need_state_lock: self.state_lock.acquire()
1218        # XXX: really a no op?
1219        #self.state[expid]['auth'].update(attrs)
1220        if self.state_filename: self.write_state()
1221        if need_state_lock: self.state_lock.release()
1222
1223    def clear_experiment_authorization(self, expid, need_state_lock=True):
1224        """
1225        Attrs is a set of attribute principal pairs that need to be removed
1226        from the authenticator.  Remove them and save the authenticator.
1227        """
1228
1229        if need_state_lock: self.state_lock.acquire()
1230        # XXX: should be a no-op
1231        #if expid in self.state and 'auth' in self.state[expid]:
1232            #for p, a in self.state[expid]['auth']:
1233                #self.auth.unset_attribute(p, a)
1234            #self.state[expid]['auth'] = set()
1235        if self.state_filename: self.write_state()
1236        if need_state_lock: self.state_lock.release()
1237        self.auth.save()
1238
1239
1240    def create_experiment_state(self, fid, req, expid, expcert,
1241            state='starting'):
1242        """
1243        Create the initial entry in the experiment's state.  The expid and
1244        expcert are the experiment's fedid and certifacte that represents that
1245        ID, which are installed in the experiment state.  If the request
1246        includes a suggested local name that is used if possible.  If the local
1247        name is already taken by an experiment owned by this user that has
1248        failed, it is overwritten.  Otherwise new letters are added until a
1249        valid localname is found.  The generated local name is returned.
1250        """
1251
1252        if req.has_key('experimentID') and \
1253                req['experimentID'].has_key('localname'):
1254            overwrite = False
1255            eid = req['experimentID']['localname']
1256            # If there's an old failed experiment here with the same local name
1257            # and accessible by this user, we'll overwrite it, otherwise we'll
1258            # fall through and do the collision avoidance.
1259            old_expid = self.get_experiment_fedid(eid)
1260            if old_expid:
1261                users_experiment = True
1262                try:
1263                    self.check_experiment_access(fid, old_expid)
1264                except service_error, e:
1265                    if e.code == service_error.access: users_experiment = False
1266                    else: raise e
1267                if users_experiment:
1268                    self.state_lock.acquire()
1269                    status = self.state[eid].status
1270                    if status and status == 'failed':
1271                        # remove the old access attributes
1272                        self.clear_experiment_authorization(eid,
1273                                need_state_lock=False)
1274                        overwrite = True
1275                        del self.state[eid]
1276                        del self.state[old_expid]
1277                    self.state_lock.release()
1278                else:
1279                    self.log.info('Experiment %s exists, ' % eid + \
1280                            'but this user cannot access it')
1281            self.state_lock.acquire()
1282            while (self.state.has_key(eid) and not overwrite):
1283                eid += random.choice(string.ascii_letters)
1284            # Initial state
1285            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1286                    identity=expcert)
1287            self.state[expid] = self.state[eid]
1288            if self.state_filename: self.write_state()
1289            self.state_lock.release()
1290        else:
1291            eid = self.exp_stem
1292            for i in range(0,5):
1293                eid += random.choice(string.ascii_letters)
1294            self.state_lock.acquire()
1295            while (self.state.has_key(eid)):
1296                eid = self.exp_stem
1297                for i in range(0,5):
1298                    eid += random.choice(string.ascii_letters)
1299            # Initial state
1300            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1301                    identity=expcert)
1302            self.state[expid] = self.state[eid]
1303            if self.state_filename: self.write_state()
1304            self.state_lock.release()
1305
1306        # Let users touch the state.  Authorize this fid and the expid itself
1307        # to touch the experiment, as well as allowing th eoverrides.
1308        self.append_experiment_authorization(eid, 
1309                set([(fid, expid), (expid,expid)] + \
1310                        [ (o, expid) for o in self.overrides]))
1311
1312        return eid
1313
1314
1315    def allocate_ips_to_topo(self, top):
1316        """
1317        Add an ip4_address attribute to all the hosts in the topology, based on
1318        the shared substrates on which they sit.  An /etc/hosts file is also
1319        created and returned as a list of hostfiles entries.  We also return
1320        the allocator, because we may need to allocate IPs to portals
1321        (specifically DRAGON portals).
1322        """
1323        subs = sorted(top.substrates, 
1324                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1325                reverse=True)
1326        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1327        ifs = { }
1328        hosts = [ ]
1329
1330        for idx, s in enumerate(subs):
1331            net_size = len(s.interfaces)+2
1332
1333            a = ips.allocate(net_size)
1334            if a :
1335                base, num = a
1336                if num < net_size: 
1337                    raise service_error(service_error.internal,
1338                            "Allocator returned wrong number of IPs??")
1339            else:
1340                raise service_error(service_error.req, 
1341                        "Cannot allocate IP addresses")
1342            mask = ips.min_alloc
1343            while mask < net_size:
1344                mask *= 2
1345
1346            netmask = ((2**32-1) ^ (mask-1))
1347
1348            base += 1
1349            for i in s.interfaces:
1350                i.attribute.append(
1351                        topdl.Attribute('ip4_address', 
1352                            "%s" % ip_addr(base)))
1353                i.attribute.append(
1354                        topdl.Attribute('ip4_netmask', 
1355                            "%s" % ip_addr(int(netmask))))
1356
1357                hname = i.element.name
1358                if ifs.has_key(hname):
1359                    hosts.append("%s\t%s-%s %s-%d" % \
1360                            (ip_addr(base), hname, s.name, hname,
1361                                ifs[hname]))
1362                else:
1363                    ifs[hname] = 0
1364                    hosts.append("%s\t%s-%s %s-%d %s" % \
1365                            (ip_addr(base), hname, s.name, hname,
1366                                ifs[hname], hname))
1367
1368                ifs[hname] += 1
1369                base += 1
1370        return hosts, ips
1371
1372    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1373            tbparam, masters, tbmap, expid=None, expcert=None):
1374        for tb in testbeds:
1375            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1376                    expcert)
1377            allocated[tb] = 1
1378
1379    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1380            expcert=None):
1381        """
1382        Get access to testbed through fedd and set the parameters for that tb
1383        """
1384        def get_export_project(svcs):
1385            """
1386            Look through for the list of federated_service for this testbed
1387            objects for a project_export service, and extract the project
1388            parameter.
1389            """
1390
1391            pe = [s for s in svcs if s.name=='project_export']
1392            if len(pe) == 1:
1393                return pe[0].params.get('project', None)
1394            elif len(pe) == 0:
1395                return None
1396            else:
1397                raise service_error(service_error.req,
1398                        "More than one project export is not supported")
1399
1400        def add_services(svcs, type, slist, keys):
1401            """
1402            Add the given services to slist.  type is import or export.  Also
1403            add a mapping entry from the assigned id to the original service
1404            record.
1405            """
1406            for i, s in enumerate(svcs):
1407                idx = '%s%d' % (type, i)
1408                keys[idx] = s
1409                sr = {'id': idx, 'name': s.name, 'visibility': type }
1410                if s.params:
1411                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1412                            for k, v in s.params.items()]
1413                slist.append(sr)
1414
1415        uri = tbmap.get(testbed_base(tb), None)
1416        if not uri:
1417            raise service_error(service_error.server_config, 
1418                    "Unknown testbed: %s" % tb)
1419
1420        export_svcs = masters.get(tb,[])
1421        import_svcs = [ s for m in masters.values() \
1422                for s in m \
1423                    if tb in s.importers ]
1424
1425        export_project = get_export_project(export_svcs)
1426        # Compose the credential list so that IDs come before attributes
1427        creds = set()
1428        keys = set()
1429        certs = self.auth.get_creds_for_principal(fid)
1430        if expid:
1431            certs.update(self.auth.get_creds_for_principal(expid))
1432        for c in certs:
1433            keys.add(c.issuer_cert())
1434            creds.add(c.attribute_cert())
1435        creds = list(keys) + list(creds)
1436
1437        if expcert: cert, pw = expcert, None
1438        else: cert, pw = self.cert_file, self.cert_pw
1439
1440        # Request credentials
1441        req = {
1442                'abac_credential': creds,
1443            }
1444        # Make the service request from the services we're importing and
1445        # exporting.  Keep track of the export request ids so we can
1446        # collect the resulting info from the access response.
1447        e_keys = { }
1448        if import_svcs or export_svcs:
1449            slist = []
1450            add_services(import_svcs, 'import', slist, e_keys)
1451            add_services(export_svcs, 'export', slist, e_keys)
1452            req['service'] = slist
1453
1454        if self.local_access.has_key(uri):
1455            # Local access call
1456            req = { 'RequestAccessRequestBody' : req }
1457            r = self.local_access[uri].RequestAccess(req, 
1458                    fedid(file=self.cert_file))
1459            r = { 'RequestAccessResponseBody' : r }
1460        else:
1461            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1462
1463        if r.has_key('RequestAccessResponseBody'):
1464            # Through to here we have a valid response, not a fault.
1465            # Access denied is a fault, so something better or worse than
1466            # access denied has happened.
1467            r = r['RequestAccessResponseBody']
1468            self.log.debug("[get_access] Access granted")
1469        else:
1470            raise service_error(service_error.protocol,
1471                        "Bad proxy response")
1472        if 'proof' not in r:
1473            raise service_error(service_error.protocol,
1474                        "Bad access response (no access proof)")
1475
1476        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1477                tb=tb, uri=uri, proof=[r['proof']], 
1478                services=masters.get(tb, None))
1479
1480        # Collect the responses corresponding to the services this testbed
1481        # exports.  These will be the service requests that we will include in
1482        # the start segment requests (with appropriate visibility values) to
1483        # import and export the segments.
1484        for s in r.get('service', []):
1485            id = s.get('id', None)
1486            # Note that this attaches the response to the object in the masters
1487            # data structure.  (The e_keys index disappears when this fcn
1488            # returns)
1489            if id and id in e_keys:
1490                e_keys[id].reqs.append(s)
1491
1492        # Add attributes to parameter space.  We don't allow attributes to
1493        # overlay any parameters already installed.
1494        for a in r.get('fedAttr', []):
1495            try:
1496                if a['attribute']:
1497                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1498            except KeyError:
1499                self.log.error("Bad attribute in response: %s" % a)
1500
1501
1502    def split_topology(self, top, topo, testbeds):
1503        """
1504        Create the sub-topologies that are needed for experiment instantiation.
1505        """
1506        for tb in testbeds:
1507            topo[tb] = top.clone()
1508            # copy in for loop allows deletions from the original
1509            for e in [ e for e in topo[tb].elements]:
1510                etb = e.get_attribute('testbed')
1511                # NB: elements without a testbed attribute won't appear in any
1512                # sub topologies. 
1513                if not etb or etb != tb:
1514                    for i in e.interface:
1515                        for s in i.subs:
1516                            try:
1517                                s.interfaces.remove(i)
1518                            except ValueError:
1519                                raise service_error(service_error.internal,
1520                                        "Can't remove interface??")
1521                    topo[tb].elements.remove(e)
1522            topo[tb].make_indices()
1523
1524    def confirm_software(self, top):
1525        """
1526        Make sure that the software to be loaded in the topo is all available
1527        before we begin making access requests, etc.  This is a subset of
1528        wrangle_software.
1529        """
1530        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1531        pkgs.update([x.location for e in top.elements for x in e.software])
1532
1533        for pkg in pkgs:
1534            loc = pkg
1535
1536            scheme, host, path = urlparse(loc)[0:3]
1537            dest = os.path.basename(path)
1538            if not scheme:
1539                if not loc.startswith('/'):
1540                    loc = "/%s" % loc
1541                loc = "file://%s" %loc
1542            # NB: if scheme was found, loc == pkg
1543            try:
1544                u = urlopen(loc)
1545                u.close()
1546            except Exception, e:
1547                raise service_error(service_error.req, 
1548                        "Cannot open %s: %s" % (loc, e))
1549        return True
1550
1551    def wrangle_software(self, expid, top, topo, tbparams):
1552        """
1553        Copy software out to the repository directory, allocate permissions and
1554        rewrite the segment topologies to look for the software in local
1555        places.
1556        """
1557
1558        # Copy the rpms and tarfiles to a distribution directory from
1559        # which the federants can retrieve them
1560        linkpath = "%s/software" %  expid
1561        softdir ="%s/%s" % ( self.repodir, linkpath)
1562        softmap = { }
1563
1564        # self.fedkit and self.gateway kit are lists of tuples of
1565        # (install_location, download_location) this extracts the download
1566        # locations.
1567        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1568        pkgs.update([x.location for e in top.elements for x in e.software])
1569        try:
1570            os.makedirs(softdir)
1571        except EnvironmentError, e:
1572            raise service_error(
1573                    "Cannot create software directory: %s" % e)
1574        # The actual copying.  Everything's converted into a url for copying.
1575        auth_attrs = set()
1576        for pkg in pkgs:
1577            loc = pkg
1578
1579            scheme, host, path = urlparse(loc)[0:3]
1580            dest = os.path.basename(path)
1581            if not scheme:
1582                if not loc.startswith('/'):
1583                    loc = "/%s" % loc
1584                loc = "file://%s" %loc
1585            # NB: if scheme was found, loc == pkg
1586            try:
1587                u = urlopen(loc)
1588            except Exception, e:
1589                raise service_error(service_error.req, 
1590                        "Cannot open %s: %s" % (loc, e))
1591            try:
1592                f = open("%s/%s" % (softdir, dest) , "w")
1593                self.log.debug("Writing %s/%s" % (softdir,dest) )
1594                data = u.read(4096)
1595                while data:
1596                    f.write(data)
1597                    data = u.read(4096)
1598                f.close()
1599                u.close()
1600            except Exception, e:
1601                raise service_error(service_error.internal,
1602                        "Could not copy %s: %s" % (loc, e))
1603            path = re.sub("/tmp", "", linkpath)
1604            # XXX
1605            softmap[pkg] = \
1606                    "%s/%s/%s" %\
1607                    ( self.repo_url, path, dest)
1608
1609            # Allow the individual segments to access the software by assigning
1610            # an attribute to each testbed allocation that encodes the data to
1611            # be released.  This expression collects the data for each run of
1612            # the loop.
1613            auth_attrs.update([
1614                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1615                        for tb in tbparams.keys()])
1616
1617        self.append_experiment_authorization(expid, auth_attrs)
1618
1619        # Convert the software locations in the segments into the local
1620        # copies on this host
1621        for soft in [ s for tb in topo.values() \
1622                for e in tb.elements \
1623                    if getattr(e, 'software', False) \
1624                        for s in e.software ]:
1625            if softmap.has_key(soft.location):
1626                soft.location = softmap[soft.location]
1627
1628
1629    def new_experiment(self, req, fid):
1630        """
1631        The external interface to empty initial experiment creation called from
1632        the dispatcher.
1633
1634        Creates a working directory, splits the incoming description using the
1635        splitter script and parses out the avrious subsections using the
1636        lcasses above.  Once each sub-experiment is created, use pooled threads
1637        to instantiate them and start it all up.
1638        """
1639        req = req.get('NewRequestBody', None)
1640        if not req:
1641            raise service_error(service_error.req,
1642                    "Bad request format (no NewRequestBody)")
1643
1644        if self.auth.import_credentials(data_list=req.get('credential', [])):
1645            self.auth.save()
1646
1647        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1648                with_proof=True)
1649
1650        if not access_ok:
1651            raise service_error(service_error.access, "New access denied",
1652                    proof=[proof])
1653
1654        try:
1655            tmpdir = tempfile.mkdtemp(prefix="split-")
1656        except EnvironmentError:
1657            raise service_error(service_error.internal, "Cannot create tmp dir")
1658
1659        try:
1660            access_user = self.accessdb[fid]
1661        except KeyError:
1662            raise service_error(service_error.internal,
1663                    "Access map and authorizer out of sync in " + \
1664                            "new_experiment for fedid %s"  % fid)
1665
1666        # Generate an ID for the experiment (slice) and a certificate that the
1667        # allocator can use to prove they own it.  We'll ship it back through
1668        # the encrypted connection.  If the requester supplied one, use it.
1669        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1670            expcert = req['experimentAccess']['X509']
1671            expid = fedid(certstr=expcert)
1672            self.state_lock.acquire()
1673            if expid in self.state:
1674                self.state_lock.release()
1675                raise service_error(service_error.req, 
1676                        'fedid %s identifies an existing experiment' % expid)
1677            self.state_lock.release()
1678        else:
1679            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1680
1681        #now we're done with the tmpdir, and it should be empty
1682        if self.cleanup:
1683            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1684            os.rmdir(tmpdir)
1685        else:
1686            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1687
1688        eid = self.create_experiment_state(fid, req, expid, expcert, 
1689                state='empty')
1690
1691        rv = {
1692                'experimentID': [
1693                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1694                ],
1695                'experimentStatus': 'empty',
1696                'experimentAccess': { 'X509' : expcert },
1697                'proof': proof.to_dict(),
1698            }
1699
1700        return rv
1701
1702    # create_experiment sub-functions
1703
1704    @staticmethod
1705    def get_experiment_key(req, field='experimentID'):
1706        """
1707        Parse the experiment identifiers out of the request (the request body
1708        tag has been removed).  Specifically this pulls either the fedid or the
1709        localname out of the experimentID field.  A fedid is preferred.  If
1710        neither is present or the request does not contain the fields,
1711        service_errors are raised.
1712        """
1713        # Get the experiment access
1714        exp = req.get(field, None)
1715        if exp:
1716            if exp.has_key('fedid'):
1717                key = exp['fedid']
1718            elif exp.has_key('localname'):
1719                key = exp['localname']
1720            else:
1721                raise service_error(service_error.req, "Unknown lookup type")
1722        else:
1723            raise service_error(service_error.req, "No request?")
1724
1725        return key
1726
1727    def get_experiment_ids_and_start(self, key, tmpdir):
1728        """
1729        Get the experiment name, id and access certificate from the state, and
1730        set the experiment state to 'starting'.  returns a triple (fedid,
1731        localname, access_cert_file). The access_cert_file is a copy of the
1732        contents of the access certificate, created in the tempdir with
1733        restricted permissions.  If things are confused, raise an exception.
1734        """
1735
1736        expid = eid = None
1737        self.state_lock.acquire()
1738        if key in self.state:
1739            exp = self.state[key]
1740            exp.status = "starting"
1741            exp.updated()
1742            expid = exp.fedid
1743            eid = exp.localname
1744            expcert = exp.identity
1745        self.state_lock.release()
1746
1747        # make a protected copy of the access certificate so the experiment
1748        # controller can act as the experiment principal.
1749        if expcert:
1750            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1751            if not expcert_file:
1752                raise service_error(service_error.internal, 
1753                        "Cannot create temp cert file?")
1754        else:
1755            expcert_file = None
1756
1757        return (eid, expid, expcert_file)
1758
1759    def get_topology(self, req, tmpdir):
1760        """
1761        Get the ns2 content and put it into a file for parsing.  Call the local
1762        or remote parser and return the topdl.Topology.  Errors result in
1763        exceptions.  req is the request and tmpdir is a work directory.
1764        """
1765
1766        # The tcl parser needs to read a file so put the content into that file
1767        descr=req.get('experimentdescription', None)
1768        if descr:
1769            if 'ns2description' in descr:
1770                file_content=descr['ns2description']
1771            elif 'topdldescription' in descr:
1772                return topdl.Topology(**descr['topdldescription'])
1773            else:
1774                raise service_error(service_error.req, 
1775                        'Unknown experiment description type')
1776        else:
1777            raise service_error(service_error.req, "No experiment description")
1778
1779
1780        if self.splitter_url:
1781            self.log.debug("Calling remote topdl translator at %s" % \
1782                    self.splitter_url)
1783            top = self.remote_ns2topdl(self.splitter_url, file_content)
1784        else:
1785            tclfile = os.path.join(tmpdir, "experiment.tcl")
1786            if file_content:
1787                try:
1788                    f = open(tclfile, 'w')
1789                    f.write(file_content)
1790                    f.close()
1791                except EnvironmentError:
1792                    raise service_error(service_error.internal,
1793                            "Cannot write temp experiment description")
1794            else:
1795                raise service_error(service_error.req, 
1796                        "Only ns2descriptions supported")
1797            pid = "dummy"
1798            gid = "dummy"
1799            eid = "dummy"
1800
1801            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1802                str(self.muxmax), '-m', 'dummy']
1803
1804            tclcmd.extend([pid, gid, eid, tclfile])
1805
1806            self.log.debug("running local splitter %s", " ".join(tclcmd))
1807            # This is just fantastic.  As a side effect the parser copies
1808            # tb_compat.tcl into the current directory, so that directory
1809            # must be writable by the fedd user.  Doing this in the
1810            # temporary subdir ensures this is the case.
1811            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1812                    cwd=tmpdir)
1813            split_data = tclparser.stdout
1814
1815            top = topdl.topology_from_xml(file=split_data, top="experiment")
1816            os.remove(tclfile)
1817
1818        return top
1819
1820    def get_testbed_services(self, req, testbeds):
1821        """
1822        Parse the services section of the request into two dicts mapping
1823        testbed to lists of federated_service objects.  The first dict maps all
1824        exporters of services to those service objects, the second maps
1825        testbeds to service objects only for services requiring portals.
1826        """
1827        # We construct both dicts here because deriving the second is more
1828        # comples than it looks - both the keys and lists can differ, and it's
1829        # much easier to generate both in one pass.
1830        masters = { }
1831        pmasters = { }
1832        for s in req.get('service', []):
1833            # If this is a service request with the importall field
1834            # set, fill it out.
1835
1836            if s.get('importall', False):
1837                s['import'] = [ tb for tb in testbeds \
1838                        if tb not in s.get('export',[])]
1839                del s['importall']
1840
1841            # Add the service to masters
1842            for tb in s.get('export', []):
1843                if s.get('name', None):
1844
1845                    params = { }
1846                    for a in s.get('fedAttr', []):
1847                        params[a.get('attribute', '')] = a.get('value','')
1848
1849                    fser = federated_service(name=s['name'],
1850                            exporter=tb, importers=s.get('import',[]),
1851                            params=params)
1852                    if fser.name == 'hide_hosts' \
1853                            and 'hosts' not in fser.params:
1854                        fser.params['hosts'] = \
1855                                ",".join(tb_hosts.get(fser.exporter, []))
1856                    if tb in masters: masters[tb].append(fser)
1857                    else: masters[tb] = [fser]
1858
1859                    if fser.portal:
1860                        if tb in pmasters: pmasters[tb].append(fser)
1861                        else: pmasters[tb] = [fser]
1862                else:
1863                    self.log.error('Testbed service does not have name " + \
1864                            "and importers')
1865        return masters, pmasters
1866
1867    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1868        """
1869        Create the ssh keys necessary for interconnecting the portal nodes and
1870        the global hosts file for letting each segment know about the IP
1871        addresses in play.  Save these into the repo.  Add attributes to the
1872        autorizer allowing access controllers to download them and return a set
1873        of attributes that inform the segments where to find this stuff.  May
1874        raise service_errors in if there are problems.
1875        """
1876        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1877        gw_secretkey_base = "fed.%s" % self.ssh_type
1878        keydir = os.path.join(tmpdir, 'keys')
1879        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1880        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1881
1882        try:
1883            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1884        except ValueError:
1885            raise service_error(service_error.server_config, 
1886                    "Bad key type (%s)" % self.ssh_type)
1887
1888        self.generate_seer_certs(keydir)
1889
1890        # Copy configuration files into the remote file store
1891        # The config urlpath
1892        configpath = "/%s/config" % expid
1893        # The config file system location
1894        configdir ="%s%s" % ( self.repodir, configpath)
1895        try:
1896            os.makedirs(configdir)
1897        except EnvironmentError, e:
1898            raise service_error(service_error.internal,
1899                    "Cannot create config directory: %s" % e)
1900        try:
1901            f = open("%s/hosts" % configdir, "w")
1902            print >> f, string.join(hosts, '\n')
1903            f.close()
1904        except EnvironmentError, e:
1905            raise service_error(service_error.internal, 
1906                    "Cannot write hosts file: %s" % e)
1907        try:
1908            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1909            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1910            copy_file(os.path.join(keydir, 'ca.pem'), 
1911                    os.path.join(configdir, 'ca.pem'))
1912            copy_file(os.path.join(keydir, 'node.pem'), 
1913                    os.path.join(configdir, 'node.pem'))
1914        except EnvironmentError, e:
1915            raise service_error(service_error.internal, 
1916                    "Cannot copy keyfiles: %s" % e)
1917
1918        # Allow the individual testbeds to access the configuration files,
1919        # again by setting an attribute for the relevant pathnames on each
1920        # allocation principal.  Yeah, that's a long list comprehension.
1921        self.append_experiment_authorization(expid, set([
1922            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1923                    for tb in tbparams.keys() \
1924                        for f in ("hosts", 'ca.pem', 'node.pem', 
1925                            gw_secretkey_base, gw_pubkey_base)]))
1926
1927        attrs = [ 
1928                {
1929                    'attribute': 'ssh_pubkey', 
1930                    'value': '%s/%s/config/%s' % \
1931                            (self.repo_url, expid, gw_pubkey_base)
1932                },
1933                {
1934                    'attribute': 'ssh_secretkey', 
1935                    'value': '%s/%s/config/%s' % \
1936                            (self.repo_url, expid, gw_secretkey_base)
1937                },
1938                {
1939                    'attribute': 'hosts', 
1940                    'value': '%s/%s/config/hosts' % \
1941                            (self.repo_url, expid)
1942                },
1943                {
1944                    'attribute': 'seer_ca_pem', 
1945                    'value': '%s/%s/config/%s' % \
1946                            (self.repo_url, expid, 'ca.pem')
1947                },
1948                {
1949                    'attribute': 'seer_node_pem', 
1950                    'value': '%s/%s/config/%s' % \
1951                            (self.repo_url, expid, 'node.pem')
1952                },
1953            ]
1954        return attrs
1955
1956
1957    def get_vtopo(self, req, fid):
1958        """
1959        Return the stored virtual topology for this experiment
1960        """
1961        rv = None
1962        state = None
1963
1964        req = req.get('VtopoRequestBody', None)
1965        if not req:
1966            raise service_error(service_error.req,
1967                    "Bad request format (no VtopoRequestBody)")
1968        exp = req.get('experiment', None)
1969        if exp:
1970            if exp.has_key('fedid'):
1971                key = exp['fedid']
1972                keytype = "fedid"
1973            elif exp.has_key('localname'):
1974                key = exp['localname']
1975                keytype = "localname"
1976            else:
1977                raise service_error(service_error.req, "Unknown lookup type")
1978        else:
1979            raise service_error(service_error.req, "No request?")
1980
1981        proof = self.check_experiment_access(fid, key)
1982
1983        self.state_lock.acquire()
1984        # XXX: this needs to be recalculated
1985        if key in self.state:
1986            if self.state[key].top is not None:
1987                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1988                rv = { 'experiment' : {keytype: key },
1989                        'vtopo': vtopo,
1990                        'proof': proof.to_dict(), 
1991                    }
1992            else:
1993                state = self.state[key].status
1994        self.state_lock.release()
1995
1996        if rv: return rv
1997        else: 
1998            if state:
1999                raise service_error(service_error.partial, 
2000                        "Not ready: %s" % state)
2001            else:
2002                raise service_error(service_error.req, "No such experiment")
2003
2004    def get_vis(self, req, fid):
2005        """
2006        Return the stored visualization for this experiment
2007        """
2008        rv = None
2009        state = None
2010
2011        req = req.get('VisRequestBody', None)
2012        if not req:
2013            raise service_error(service_error.req,
2014                    "Bad request format (no VisRequestBody)")
2015        exp = req.get('experiment', None)
2016        if exp:
2017            if exp.has_key('fedid'):
2018                key = exp['fedid']
2019                keytype = "fedid"
2020            elif exp.has_key('localname'):
2021                key = exp['localname']
2022                keytype = "localname"
2023            else:
2024                raise service_error(service_error.req, "Unknown lookup type")
2025        else:
2026            raise service_error(service_error.req, "No request?")
2027
2028        proof = self.check_experiment_access(fid, key)
2029
2030        self.state_lock.acquire()
2031        # Generate the visualization
2032        if key in self.state:
2033            if self.state[key].top is not None:
2034                try:
2035                    vis = self.genviz(
2036                            topdl.topology_to_vtopo(self.state[key].topo))
2037                except service_error, e:
2038                    self.state_lock.release()
2039                    raise e
2040                rv =  { 'experiment' : {keytype: key },
2041                        'vis': vis,
2042                        'proof': proof.to_dict(), 
2043                        }
2044            else:
2045                state = self.state[key].status
2046        self.state_lock.release()
2047
2048        if rv: return rv
2049        else:
2050            if state:
2051                raise service_error(service_error.partial, 
2052                        "Not ready: %s" % state)
2053            else:
2054                raise service_error(service_error.req, "No such experiment")
2055
2056   
2057    def save_federant_information(self, allocated, tbparams, eid, top):
2058        """
2059        Store the various data that have changed in the experiment state
2060        between when it was started and the beginning of resource allocation.
2061        This is basically the information about each local allocation.  This
2062        fills in the values of the placeholder allocation in the state.  It
2063        also collects the access proofs and returns them as dicts for a
2064        response message.
2065        """
2066        self.state_lock.acquire()
2067        exp = self.state[eid]
2068        exp.top = top.clone()
2069        # save federant information
2070        for k in allocated.keys():
2071            exp.add_allocation(tbparams[k])
2072            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2073                type="testbed", localname=[k], 
2074                service=[ s.to_topdl() for s in tbparams[k].services]))
2075
2076        # Access proofs for the response message
2077        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2078                    for p in tbparams[k].proof]
2079        exp.updated()
2080        if self.state_filename: 
2081            self.write_state()
2082        self.state_lock.release()
2083        return proofs
2084
2085    def clear_placeholder(self, eid, expid, tmpdir):
2086        """
2087        Clear the placeholder and remove any allocated temporary dir.
2088        """
2089
2090        self.state_lock.acquire()
2091        del self.state[eid]
2092        del self.state[expid]
2093        if self.state_filename: self.write_state()
2094        self.state_lock.release()
2095        if tmpdir and self.cleanup:
2096            self.remove_dirs(tmpdir)
2097
2098    # end of create_experiment sub-functions
2099
2100    def create_experiment(self, req, fid):
2101        """
2102        The external interface to experiment creation called from the
2103        dispatcher.
2104
2105        Creates a working directory, splits the incoming description using the
2106        splitter script and parses out the various subsections using the
2107        classes above.  Once each sub-experiment is created, use pooled threads
2108        to instantiate them and start it all up.
2109        """
2110
2111        req = req.get('CreateRequestBody', None)
2112        if req:
2113            key = self.get_experiment_key(req)
2114        else:
2115            raise service_error(service_error.req,
2116                    "Bad request format (no CreateRequestBody)")
2117
2118        # Import information from the requester
2119        if self.auth.import_credentials(data_list=req.get('credential', [])):
2120            self.auth.save()
2121
2122        # Make sure that the caller can talk to us
2123        proof = self.check_experiment_access(fid, key)
2124
2125        # Install the testbed map entries supplied with the request into a copy
2126        # of the testbed map.
2127        tbmap = dict(self.tbmap)
2128        for m in req.get('testbedmap', []):
2129            if 'testbed' in m and 'uri' in m:
2130                tbmap[m['testbed']] = m['uri']
2131
2132        # a place to work
2133        try:
2134            tmpdir = tempfile.mkdtemp(prefix="split-")
2135            os.mkdir(tmpdir+"/keys")
2136        except EnvironmentError:
2137            raise service_error(service_error.internal, "Cannot create tmp dir")
2138
2139        tbparams = { }
2140
2141        eid, expid, expcert_file = \
2142                self.get_experiment_ids_and_start(key, tmpdir)
2143
2144        # This catches exceptions to clear the placeholder if necessary
2145        try: 
2146            if not (eid and expid):
2147                raise service_error(service_error.internal, 
2148                        "Cannot find local experiment info!?")
2149
2150            top = self.get_topology(req, tmpdir)
2151            self.confirm_software(top)
2152            # Assign the IPs
2153            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2154            # Find the testbeds to look up
2155            tb_hosts = { }
2156            testbeds = [ ]
2157            for e in top.elements:
2158                if isinstance(e, topdl.Computer):
2159                    tb = e.get_attribute('testbed') or 'default'
2160                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2161                    else: 
2162                        tb_hosts[tb] = [ e.name ]
2163                        testbeds.append(tb)
2164
2165            masters, pmasters = self.get_testbed_services(req, testbeds)
2166            allocated = { }         # Testbeds we can access
2167            topo ={ }               # Sub topologies
2168            connInfo = { }          # Connection information
2169
2170            self.split_topology(top, topo, testbeds)
2171
2172            self.get_access_to_testbeds(testbeds, fid, allocated, 
2173                    tbparams, masters, tbmap, expid, expcert_file)
2174
2175            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2176
2177            part = experiment_partition(self.auth, self.store_url, tbmap,
2178                    self.muxmax, self.direct_transit)
2179            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2180                    connInfo, expid)
2181
2182            auth_attrs = set()
2183            # Now get access to the dynamic testbeds (those added above)
2184            for tb in [ t for t in topo if t not in allocated]:
2185                self.get_access(tb, tbparams, fid, masters, tbmap, 
2186                        expid, expcert_file)
2187                allocated[tb] = 1
2188                store_keys = topo[tb].get_attribute('store_keys')
2189                # Give the testbed access to keys it exports or imports
2190                if store_keys:
2191                    auth_attrs.update(set([
2192                        (tbparams[tb].allocID, sk) \
2193                                for sk in store_keys.split(" ")]))
2194
2195            if auth_attrs:
2196                self.append_experiment_authorization(expid, auth_attrs)
2197
2198            # transit and disconnected testbeds may not have a connInfo entry.
2199            # Fill in the blanks.
2200            for t in allocated.keys():
2201                if not connInfo.has_key(t):
2202                    connInfo[t] = { }
2203
2204            self.wrangle_software(expid, top, topo, tbparams)
2205
2206            proofs = self.save_federant_information(allocated, tbparams, 
2207                    eid, top)
2208        except service_error, e:
2209            # If something goes wrong in the parse (usually an access error)
2210            # clear the placeholder state.  From here on out the code delays
2211            # exceptions.  Failing at this point returns a fault to the remote
2212            # caller.
2213            self.clear_placeholder(eid, expid, tmpdir)
2214            raise e
2215
2216        # Start the background swapper and return the starting state.  From
2217        # here on out, the state will stick around a while.
2218
2219        # Create a logger that logs to the experiment's state object as well as
2220        # to the main log file.
2221        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2222        alloc_collector = self.list_log(self.state[eid].log)
2223        h = logging.StreamHandler(alloc_collector)
2224        # XXX: there should be a global one of these rather than repeating the
2225        # code.
2226        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2227                    '%d %b %y %H:%M:%S'))
2228        alloc_log.addHandler(h)
2229
2230        # Start a thread to do the resource allocation
2231        t  = Thread(target=self.allocate_resources,
2232                args=(allocated, masters, eid, expid, tbparams, 
2233                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2234                    connInfo, tbmap, expcert_file),
2235                name=eid)
2236        t.start()
2237
2238        rv = {
2239                'experimentID': [
2240                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2241                ],
2242                'experimentStatus': 'starting',
2243                'proof': [ proof.to_dict() ] + proofs,
2244            }
2245
2246        return rv
2247   
2248    def get_experiment_fedid(self, key):
2249        """
2250        find the fedid associated with the localname key in the state database.
2251        """
2252
2253        rv = None
2254        self.state_lock.acquire()
2255        if key in self.state:
2256            rv = self.state[key].fedid
2257        self.state_lock.release()
2258        return rv
2259
2260    def check_experiment_access(self, fid, key):
2261        """
2262        Confirm that the fid has access to the experiment.  Though a request
2263        may be made in terms of a local name, the access attribute is always
2264        the experiment's fedid.
2265        """
2266        if not isinstance(key, fedid):
2267            key = self.get_experiment_fedid(key)
2268
2269        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2270
2271        if access_ok:
2272            return proof
2273        else:
2274            raise service_error(service_error.access, "Access Denied",
2275                proof)
2276
2277
2278    def get_handler(self, path, fid):
2279        """
2280        Perhaps surprisingly named, this function handles HTTP GET requests to
2281        this server (SOAP requests are POSTs).
2282        """
2283        self.log.info("Get handler %s %s" % (path, fid))
2284        # XXX: log proofs?
2285        if self.auth.check_attribute(fid, path):
2286            return ("%s/%s" % (self.repodir, path), "application/binary")
2287        else:
2288            return (None, None)
2289
2290    def update_info(self, key, force=False):
2291        top = None
2292        self.state_lock.acquire()
2293        if key in self.state:
2294            if force or self.state[key].older_than(self.info_cache_limit):
2295                top = self.state[key].top
2296                if top is not None: top = top.clone()
2297                d1, info_params, cert, d2 = \
2298                        self.get_segment_info(self.state[key], need_lock=False)
2299        self.state_lock.release()
2300
2301        if top is None: return
2302
2303        try:
2304            tmpdir = tempfile.mkdtemp(prefix="info-")
2305        except EnvironmentError:
2306            raise service_error(service_error.internal, 
2307                    "Cannot create tmp dir")
2308        cert_file = self.make_temp_certfile(cert, tmpdir)
2309
2310        data = []
2311        try:
2312            for k, (uri, aid) in info_params.items():
2313                info=self.info_segment(log=self.log, testbed=uri,
2314                            cert_file=cert_file, cert_pwd=None,
2315                            trusted_certs=self.trusted_certs,
2316                            caller=self.call_InfoSegment)
2317                info(uri, aid)
2318                data.append(info)
2319        # Clean up the tmpdir no matter what
2320        finally:
2321            if tmpdir: self.remove_dirs(tmpdir)
2322
2323        self.annotate_topology(top, data)
2324        self.state_lock.acquire()
2325        if key in self.state:
2326            self.state[key].top = top
2327            self.state[key].updated()
2328            if self.state_filename: self.write_state()
2329        self.state_lock.release()
2330
2331   
2332    def get_info(self, req, fid):
2333        """
2334        Return all the stored info about this experiment
2335        """
2336        rv = None
2337
2338        req = req.get('InfoRequestBody', None)
2339        if not req:
2340            raise service_error(service_error.req,
2341                    "Bad request format (no InfoRequestBody)")
2342        exp = req.get('experiment', None)
2343        legacy = req.get('legacy', False)
2344        fresh = req.get('fresh', False)
2345        if exp:
2346            if exp.has_key('fedid'):
2347                key = exp['fedid']
2348                keytype = "fedid"
2349            elif exp.has_key('localname'):
2350                key = exp['localname']
2351                keytype = "localname"
2352            else:
2353                raise service_error(service_error.req, "Unknown lookup type")
2354        else:
2355            raise service_error(service_error.req, "No request?")
2356
2357        proof = self.check_experiment_access(fid, key)
2358
2359        self.update_info(key, fresh)
2360
2361        self.state_lock.acquire()
2362        if self.state.has_key(key):
2363            rv = self.state[key].get_info()
2364            # Copy the topo if we need legacy annotations
2365            if legacy:
2366                top = self.state[key].top
2367                if top is not None: top = top.clone()
2368        self.state_lock.release()
2369
2370        # If the legacy visualization and topology representations are
2371        # requested, calculate them and add them to the return.
2372        if legacy and rv is not None:
2373            if top is not None:
2374                vtopo = topdl.topology_to_vtopo(top)
2375                if vtopo is not None:
2376                    rv['vtopo'] = vtopo
2377                    try:
2378                        vis = self.genviz(vtopo)
2379                    except service_error, e:
2380                        self.log.debug('Problem generating visualization: %s' \
2381                                % e)
2382                        vis = None
2383                    if vis is not None:
2384                        rv['vis'] = vis
2385        if rv:
2386            rv['proof'] = proof.to_dict()
2387            return rv
2388        else:
2389            raise service_error(service_error.req, "No such experiment")
2390
2391    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2392            results):
2393        """
2394        Call OperateSegment on multiple testbeds and gather the results.
2395        op_params contains the parameters needed to contact that testbed, cert
2396        is a certificate containing the fedid to use, op is the operation,
2397        testbeds is a dict mapping testbed name to targets in that testbed,
2398        params are the parameters to include a,d results is a growing list of
2399        the results of the calls.
2400        """
2401        try:
2402            tmpdir = tempfile.mkdtemp(prefix="info-")
2403        except EnvironmentError:
2404            raise service_error(service_error.internal, 
2405                    "Cannot create tmp dir")
2406        cert_file = self.make_temp_certfile(cert, tmpdir)
2407
2408        try:
2409            for tb, targets in testbeds.items():
2410                if tb in op_params:
2411                    uri, aid = op_params[tb]
2412                    operate=self.operation_segment(log=self.log, testbed=uri,
2413                                cert_file=cert_file, cert_pwd=None,
2414                                trusted_certs=self.trusted_certs,
2415                                caller=self.call_OperationSegment)
2416                    if operate(uri, aid, op, targets, params):
2417                        if operate.status is not None:
2418                            results.extend(operate.status)
2419                            continue
2420                # Something went wrong in a weird way.  Add statuses
2421                # that reflect that to results
2422                for t in targets:
2423                    results.append(operation_status(t, 
2424                        operation_status.federant,
2425                        'Unexpected error on %s' % tb))
2426        # Clean up the tmpdir no matter what
2427        finally:
2428            if tmpdir: self.remove_dirs(tmpdir)
2429
2430    def do_operation(self, req, fid):
2431        """
2432        Find the testbeds holding each target and ask them to carry out the
2433        operation.  Return the statuses.
2434        """
2435        # Map an element to the testbed containing it
2436        def element_to_tb(e):
2437            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2438            elif isinstance(e, topdl.Testbed): return e.name
2439            else: return None
2440        # If d is an operation_status object, make it a dict
2441        def make_dict(d):
2442            if isinstance(d, dict): return d
2443            elif isinstance(d, operation_status): return d.to_dict()
2444            else: return { }
2445
2446        def element_name(e):
2447            if isinstance(e, topdl.Computer): return e.name
2448            elif isinstance(e, topdl.Testbed): 
2449                if e.localname: return e.localname[0]
2450                else: return None
2451            else: return None
2452
2453        req = req.get('OperationRequestBody', None)
2454        if not req:
2455            raise service_error(service_error.req,
2456                    "Bad request format (no OperationRequestBody)")
2457        exp = req.get('experiment', None)
2458        op = req.get('operation', None)
2459        targets = set(req.get('target', []))
2460        params = req.get('parameter', None)
2461
2462        if exp:
2463            if 'fedid' in exp:
2464                key = exp['fedid']
2465                keytype = "fedid"
2466            elif 'localname' in exp:
2467                key = exp['localname']
2468                keytype = "localname"
2469            else:
2470                raise service_error(service_error.req, "Unknown lookup type")
2471        else:
2472            raise service_error(service_error.req, "No request?")
2473
2474        if op is None or not targets:
2475            raise service_error(service_error.req, "No request?")
2476
2477        proof = self.check_experiment_access(fid, key)
2478        self.state_lock.acquire()
2479        if key in self.state:
2480            d1, op_params, cert, d2 = \
2481                    self.get_segment_info(self.state[key], need_lock=False,
2482                            key='tb')
2483            top = self.state[key].top
2484            if top is not None:
2485                top = top.clone()
2486        self.state_lock.release()
2487
2488        if top is None:
2489            raise service_error(service_error.partial, "No topology yet", 
2490                    proof=proof)
2491
2492        testbeds = { }
2493        results = []
2494        for e in top.elements:
2495            ename = element_name(e)
2496            if ename in targets:
2497                tb = element_to_tb(e)
2498                targets.remove(ename)
2499                if tb is not None:
2500                    if tb in testbeds: testbeds[tb].append(ename)
2501                    else: testbeds[tb] = [ ename ]
2502                else:
2503                    results.append(operation_status(e.name, 
2504                        code=operation_status.no_target, 
2505                        description='Cannot map target to testbed'))
2506
2507        for t in targets:
2508            results.append(operation_status(t, operation_status.no_target))
2509
2510        self.operate_on_segments(op_params, cert, op, testbeds, params,
2511                results)
2512
2513        return { 
2514                'experiment': exp, 
2515                'status': [make_dict(r) for r in results],
2516                'proof': proof.to_dict()
2517                }
2518
2519
2520    def get_multi_info(self, req, fid):
2521        """
2522        Return all the stored info that this fedid can access
2523        """
2524        rv = { 'info': [ ], 'proof': [ ] }
2525
2526        self.state_lock.acquire()
2527        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2528            try:
2529                proof = self.check_experiment_access(fid, key)
2530            except service_error, e:
2531                if e.code == service_error.access:
2532                    continue
2533                else:
2534                    self.state_lock.release()
2535                    raise e
2536
2537            if self.state.has_key(key):
2538                e = self.state[key].get_info()
2539                e['proof'] = proof.to_dict()
2540                rv['info'].append(e)
2541                rv['proof'].append(proof.to_dict())
2542        self.state_lock.release()
2543        return rv
2544
2545    def check_termination_status(self, fed_exp, force):
2546        """
2547        Confirm that the experiment is sin a valid state to stop (or force it)
2548        return the state - invalid states for deletion and force settings cause
2549        exceptions.
2550        """
2551        self.state_lock.acquire()
2552        status = fed_exp.status
2553
2554        if status:
2555            if status in ('starting', 'terminating'):
2556                if not force:
2557                    self.state_lock.release()
2558                    raise service_error(service_error.partial, 
2559                            'Experiment still being created or destroyed')
2560                else:
2561                    self.log.warning('Experiment in %s state ' % status + \
2562                            'being terminated by force.')
2563            self.state_lock.release()
2564            return status
2565        else:
2566            # No status??? trouble
2567            self.state_lock.release()
2568            raise service_error(service_error.internal,
2569                    "Experiment has no status!?")
2570
2571    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2572        ids = []
2573        term_params = { }
2574        if need_lock: self.state_lock.acquire()
2575        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2576        expcert = fed_exp.identity
2577        repo = "%s" % fed_exp.fedid
2578
2579        # Collect the allocation/segment ids into a dict keyed by the fedid
2580        # of the allocation that contains a tuple of uri, aid
2581        for i, fed in enumerate(fed_exp.get_all_allocations()):
2582            uri = fed.uri
2583            aid = fed.allocID
2584            if key == 'aid': term_params[aid] = (uri, aid)
2585            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2586
2587        if need_lock: self.state_lock.release()
2588        return ids, term_params, expcert, repo
2589
2590
2591    def get_termination_info(self, fed_exp):
2592        self.state_lock.acquire()
2593        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2594        # Change the experiment state
2595        fed_exp.status = 'terminating'
2596        fed_exp.updated()
2597        if self.state_filename: self.write_state()
2598        self.state_lock.release()
2599
2600        return ids, term_params, expcert, repo
2601
2602
2603    def deallocate_resources(self, term_params, expcert, status, force, 
2604            dealloc_log):
2605        tmpdir = None
2606        # This try block makes sure the tempdir is cleared
2607        try:
2608            # If no expcert, try the deallocation as the experiment
2609            # controller instance.
2610            if expcert and self.auth_type != 'legacy': 
2611                try:
2612                    tmpdir = tempfile.mkdtemp(prefix="term-")
2613                except EnvironmentError:
2614                    raise service_error(service_error.internal, 
2615                            "Cannot create tmp dir")
2616                cert_file = self.make_temp_certfile(expcert, tmpdir)
2617                pw = None
2618            else: 
2619                cert_file = self.cert_file
2620                pw = self.cert_pwd
2621
2622            # Stop everyone.  NB, wait_for_all waits until a thread starts
2623            # and then completes, so we can't wait if nothing starts.  So,
2624            # no tbparams, no start.
2625            if len(term_params) > 0:
2626                tp = thread_pool(self.nthreads)
2627                for k, (uri, aid) in term_params.items():
2628                    # Create and start a thread to stop the segment
2629                    tp.wait_for_slot()
2630                    t  = pooled_thread(\
2631                            target=self.terminate_segment(log=dealloc_log,
2632                                testbed=uri,
2633                                cert_file=cert_file, 
2634                                cert_pwd=pw,
2635                                trusted_certs=self.trusted_certs,
2636                                caller=self.call_TerminateSegment),
2637                            args=(uri, aid), name=k,
2638                            pdata=tp, trace_file=self.trace_file)
2639                    t.start()
2640                # Wait for completions
2641                tp.wait_for_all_done()
2642
2643            # release the allocations (failed experiments have done this
2644            # already, and starting experiments may be in odd states, so we
2645            # ignore errors releasing those allocations
2646            try: 
2647                for k, (uri, aid)  in term_params.items():
2648                    self.release_access(None, aid, uri=uri,
2649                            cert_file=cert_file, cert_pwd=pw)
2650            except service_error, e:
2651                if status != 'failed' and not force:
2652                    raise e
2653
2654        # Clean up the tmpdir no matter what
2655        finally:
2656            if tmpdir: self.remove_dirs(tmpdir)
2657
2658    def terminate_experiment(self, req, fid):
2659        """
2660        Swap this experiment out on the federants and delete the shared
2661        information
2662        """
2663        tbparams = { }
2664        req = req.get('TerminateRequestBody', None)
2665        if not req:
2666            raise service_error(service_error.req,
2667                    "Bad request format (no TerminateRequestBody)")
2668
2669        key = self.get_experiment_key(req, 'experiment')
2670        proof = self.check_experiment_access(fid, key)
2671        exp = req.get('experiment', False)
2672        force = req.get('force', False)
2673
2674        dealloc_list = [ ]
2675
2676
2677        # Create a logger that logs to the dealloc_list as well as to the main
2678        # log file.
2679        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2680        h = logging.StreamHandler(self.list_log(dealloc_list))
2681        # XXX: there should be a global one of these rather than repeating the
2682        # code.
2683        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2684                    '%d %b %y %H:%M:%S'))
2685        dealloc_log.addHandler(h)
2686
2687        self.state_lock.acquire()
2688        fed_exp = self.state.get(key, None)
2689        self.state_lock.release()
2690        repo = None
2691
2692        if fed_exp:
2693            status = self.check_termination_status(fed_exp, force)
2694            # get_termination_info updates the experiment state
2695            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2696            self.deallocate_resources(term_params, expcert, status, force, 
2697                    dealloc_log)
2698
2699            # Remove the terminated experiment
2700            self.state_lock.acquire()
2701            for id in ids:
2702                self.clear_experiment_authorization(id, need_state_lock=False)
2703                if id in self.state: del self.state[id]
2704
2705            if self.state_filename: self.write_state()
2706            self.state_lock.release()
2707
2708            # Delete any synch points associated with this experiment.  All
2709            # synch points begin with the fedid of the experiment.
2710            fedid_keys = set(["fedid:%s" % f for f in ids \
2711                    if isinstance(f, fedid)])
2712            for k in self.synch_store.all_keys():
2713                try:
2714                    if len(k) > 45 and k[0:46] in fedid_keys:
2715                        self.synch_store.del_value(k)
2716                except synch_store.BadDeletionError:
2717                    pass
2718            self.write_store()
2719
2720            # Remove software and other cached stuff from the filesystem.
2721            if repo:
2722                self.remove_dirs("%s/%s" % (self.repodir, repo))
2723       
2724            return { 
2725                    'experiment': exp , 
2726                    'deallocationLog': string.join(dealloc_list, ''),
2727                    'proof': [proof.to_dict()],
2728                    }
2729        else:
2730            raise service_error(service_error.req, "No saved state")
2731
2732
2733    def GetValue(self, req, fid):
2734        """
2735        Get a value from the synchronized store
2736        """
2737        req = req.get('GetValueRequestBody', None)
2738        if not req:
2739            raise service_error(service_error.req,
2740                    "Bad request format (no GetValueRequestBody)")
2741       
2742        name = req.get('name', None)
2743        wait = req.get('wait', False)
2744        rv = { 'name': name }
2745
2746        if not name:
2747            raise service_error(service_error.req, "No name?")
2748
2749        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2750
2751        if access_ok:
2752            self.log.debug("[GetValue] asking for %s " % name)
2753            try:
2754                v = self.synch_store.get_value(name, wait)
2755            except synch_store.RevokedKeyError:
2756                # No more synch on this key
2757                raise service_error(service_error.federant, 
2758                        "Synch key %s revoked" % name)
2759            if v is not None:
2760                rv['value'] = v
2761            rv['proof'] = proof.to_dict()
2762            self.log.debug("[GetValue] got %s from %s" % (v, name))
2763            return rv
2764        else:
2765            raise service_error(service_error.access, "Access Denied",
2766                    proof=proof)
2767       
2768
2769    def SetValue(self, req, fid):
2770        """
2771        Set a value in the synchronized store
2772        """
2773        req = req.get('SetValueRequestBody', None)
2774        if not req:
2775            raise service_error(service_error.req,
2776                    "Bad request format (no SetValueRequestBody)")
2777       
2778        name = req.get('name', None)
2779        v = req.get('value', '')
2780
2781        if not name:
2782            raise service_error(service_error.req, "No name?")
2783
2784        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2785
2786        if access_ok:
2787            try:
2788                self.synch_store.set_value(name, v)
2789                self.write_store()
2790                self.log.debug("[SetValue] set %s to %s" % (name, v))
2791            except synch_store.CollisionError:
2792                # Translate into a service_error
2793                raise service_error(service_error.req,
2794                        "Value already set: %s" %name)
2795            except synch_store.RevokedKeyError:
2796                # No more synch on this key
2797                raise service_error(service_error.federant, 
2798                        "Synch key %s revoked" % name)
2799                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2800        else:
2801            raise service_error(service_error.access, "Access Denied",
2802                    proof=proof)
Note: See TracBrowser for help on using the repository browser.