source: fedd/federation/experiment_control.py @ a69de97

compt_changes
Last change on this file since a69de97 was a69de97, checked in by Ted Faber <faber@…>, 12 years ago

Add logging of termination

  • Property mode set to 100644
File size: 92.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from deter import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39from operation_status import operation_status
40
41from deter import topdl
42from deter import ip_allocator
43from deter import ip_addr
44import list_log
45
46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
53class experiment_control_local(experiment_control_legacy):
54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
60
61    class ssh_cmd_timeout(RuntimeError): pass
62   
63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
65    call_StartSegment = service_caller('StartSegment')
66    call_TerminateSegment = service_caller('TerminateSegment')
67    call_InfoSegment = service_caller('InfoSegment')
68    call_OperationSegment = service_caller('OperationSegment')
69    call_Ns2Topdl = service_caller('Ns2Topdl')
70
71    def __init__(self, config=None, auth=None):
72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
90        self.list_log = list_log.list_log
91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
102        self.repodir = config.get("experiment_control", "repodir")
103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
105
106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
110        self.nthreads = 10
111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131        accessdb_file = config.get("experiment_control", "accessdb")
132
133        dt = config.get("experiment_control", "direct_transit")
134        self.auth_type = config.get('experiment_control', 'auth_type') \
135                or 'legacy'
136        self.auth_dir = config.get('experiment_control', 'auth_dir')
137        # XXX: document this!
138        self.info_cache_limit = \
139                config.getint('experiment_control', 'info_cache', 600)
140        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
141        else: self.direct_transit = [ ]
142        # NB for internal master/slave ops, not experiment setup
143        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
144
145        self.overrides = set([])
146        ovr = config.get('experiment_control', 'overrides')
147        if ovr:
148            for o in ovr.split(","):
149                o = o.strip()
150                if o.startswith('fedid:'): o = o[len('fedid:'):]
151                self.overrides.add(fedid(hexstr=o))
152
153        self.state = { }
154        self.state_lock = Lock()
155        self.tclsh = "/usr/local/bin/otclsh"
156        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
157                config.get("experiment_control", "tcl_splitter",
158                        "/usr/testbed/lib/ns2ir/parse.tcl")
159        mapdb_file = config.get("experiment_control", "mapdb")
160        self.trace_file = sys.stderr
161
162        self.def_expstart = \
163                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
164                "/tmp/federate";
165        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
166                "FEDDIR/hosts";
167        self.def_gwstart = \
168                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
169                "/tmp/bridge.log";
170        self.def_mgwstart = \
171                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
172                "/tmp/bridge.log";
173        self.def_gwimage = "FBSD61-TUNNEL2";
174        self.def_gwtype = "pc";
175        self.local_access = { }
176
177        if self.auth_type == 'legacy':
178            if auth:
179                self.auth = auth
180            else:
181                self.log.error( "[access]: No authorizer initialized, " +\
182                        "creating local one.")
183                auth = authorizer()
184            self.get_access = self.legacy_get_access
185        elif self.auth_type == 'abac':
186            self.auth = abac_authorizer(load=self.auth_dir)
187        else:
188            raise service_error(service_error.internal, 
189                    "Unknown auth_type: %s" % self.auth_type)
190
191        if mapdb_file:
192            self.read_mapdb(mapdb_file)
193        else:
194            self.log.warn("[experiment_control] No testbed map, using defaults")
195            self.tbmap = { 
196                    'deter':'https://users.isi.deterlab.net:23235',
197                    'emulab':'https://users.isi.deterlab.net:23236',
198                    'ucb':'https://users.isi.deterlab.net:23237',
199                    }
200
201        if accessdb_file:
202                self.read_accessdb(accessdb_file)
203        else:
204            raise service_error(service_error.internal,
205                    "No accessdb specified in config")
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
219        # Dispatch tables
220        self.soap_services = {\
221                'New': soap_handler('New', self.new_experiment),
222                'Create': soap_handler('Create', self.create_experiment),
223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
227                'Operation': soap_handler('Operation', self.do_operation),
228                'Terminate': soap_handler('Terminate', 
229                    self.terminate_experiment),
230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
232        }
233
234        self.xmlrpc_services = {\
235                'New': xmlrpc_handler('New', self.new_experiment),
236                'Create': xmlrpc_handler('Create', self.create_experiment),
237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
241                'Terminate': xmlrpc_handler('Terminate',
242                    self.terminate_experiment),
243                'Operation': xmlrpc_handler('Operation', self.do_operation),
244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
246        }
247
248    # Call while holding self.state_lock
249    def write_state(self):
250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
262        except EnvironmentError, e:
263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
269
270    @staticmethod
271    def get_alloc_ids(exp):
272        """
273        Used by read_store and read state.  This used to be worse.
274        """
275
276        return [ a.allocID for a in exp.get_all_allocations() ]
277
278
279    # Call while holding self.state_lock
280    def read_state(self):
281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
286       
287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
292        except EnvironmentError, e:
293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
299        for s in self.state.values():
300            try:
301
302                eid = s.fedid
303                if eid : 
304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
307                        #self.auth.set_attribute(s['owner'], eid)
308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
317                else:
318                    raise KeyError("No experiment id")
319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
322
323
324    def read_accessdb(self, accessdb_file):
325        """
326        Read the mapping from fedids that can create experiments to their name
327        in the 3-level access namespace.  All will be asserted from this
328        testbed and can include the local username and porject that will be
329        asserted on their behalf by this fedd.  Each fedid is also added to the
330        authorization system with the "create" attribute.
331        """
332        self.accessdb = {}
333        # These are the regexps for parsing the db
334        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
335        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
336                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
337        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
338                "\s*->\s*(" + name_expr + ")\s*$")
339        lineno = 0
340
341        # Parse the mappings and store in self.authdb, a dict of
342        # fedid -> (proj, user)
343        try:
344            f = open(accessdb_file, "r")
345            for line in f:
346                lineno += 1
347                line = line.strip()
348                if len(line) == 0 or line.startswith('#'):
349                    continue
350                m = project_line.match(line)
351                if m:
352                    fid = fedid(hexstr=m.group(1))
353                    project, user = m.group(2,3)
354                    if not self.accessdb.has_key(fid):
355                        self.accessdb[fid] = []
356                    self.accessdb[fid].append((project, user))
357                    continue
358
359                m = user_line.match(line)
360                if m:
361                    fid = fedid(hexstr=m.group(1))
362                    project = None
363                    user = m.group(2)
364                    if not self.accessdb.has_key(fid):
365                        self.accessdb[fid] = []
366                    self.accessdb[fid].append((project, user))
367                    continue
368                self.log.warn("[experiment_control] Error parsing access " +\
369                        "db %s at line %d" %  (accessdb_file, lineno))
370        except EnvironmentError:
371            raise service_error(service_error.internal,
372                    ("Error opening/reading %s as experiment " +\
373                            "control accessdb") %  accessdb_file)
374        f.close()
375
376        # Initialize the authorization attributes
377        # XXX: legacy
378        if self.auth_type == 'legacy':
379            for fid in self.accessdb.keys():
380                self.auth.set_attribute(fid, 'create')
381                self.auth.set_attribute(fid, 'new')
382
383    def read_mapdb(self, file):
384        """
385        Read a simple colon separated list of mappings for the
386        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
387        """
388
389        self.tbmap = { }
390        lineno =0
391        try:
392            f = open(file, "r")
393            for line in f:
394                lineno += 1
395                line = line.strip()
396                if line.startswith('#') or len(line) == 0:
397                    continue
398                try:
399                    label, url = line.split(':', 1)
400                    self.tbmap[label] = url
401                except ValueError, e:
402                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
403                            "map db: %s %s" % (lineno, line, e))
404        except EnvironmentError, e:
405            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
406                    "open %s: %s" % (file, e))
407        else:
408            f.close()
409
410    def read_store(self):
411        try:
412            self.synch_store = synch_store()
413            self.synch_store.load(self.store_filename)
414            self.log.debug("[read_store]: Read store from %s" % \
415                    self.store_filename)
416        except EnvironmentError, e:
417            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
418                    % (self.state_filename, e))
419            self.synch_store = synch_store()
420
421        # Set the initial permissions on data in the store.  XXX: This ad hoc
422        # authorization attribute initialization is getting out of hand.
423        # XXX: legacy
424        if self.auth_type == 'legacy':
425            for k in self.synch_store.all_keys():
426                try:
427                    if k.startswith('fedid:'):
428                        fid = fedid(hexstr=k[6:46])
429                        if self.state.has_key(fid):
430                            for a in self.get_alloc_ids(self.state[fid]):
431                                self.auth.set_attribute(a, k)
432                except ValueError, e:
433                    self.log.warn("Cannot deduce permissions for %s" % k)
434
435
436    def write_store(self):
437        """
438        Write a new copy of synch_store after writing current state
439        to a backup.  We use the internal synch_store pickle method to avoid
440        incinsistent data.
441
442        State format is a simple pickling of the store.
443        """
444        if os.access(self.store_filename, os.W_OK):
445            copy_file(self.store_filename, \
446                    "%s.bak" % self.store_filename)
447        try:
448            self.synch_store.save(self.store_filename)
449        except EnvironmentError, e:
450            self.log.error("Can't write file %s: %s" % \
451                    (self.store_filename, e))
452        except TypeError, e:
453            self.log.error("Pickling problem (TypeError): %s" % e)
454
455
456    def remove_dirs(self, dir):
457        """
458        Remove the directory tree and all files rooted at dir.  Log any errors,
459        but continue.
460        """
461        self.log.debug("[removedirs]: removing %s" % dir)
462        try:
463            for path, dirs, files in os.walk(dir, topdown=False):
464                for f in files:
465                    os.remove(os.path.join(path, f))
466                for d in dirs:
467                    os.rmdir(os.path.join(path, d))
468            os.rmdir(dir)
469        except EnvironmentError, e:
470            self.log.error("Error deleting directory tree in %s" % e);
471
472    @staticmethod
473    def make_temp_certfile(expcert, tmpdir):
474        """
475        make a protected copy of the access certificate so the experiment
476        controller can act as the experiment principal.  mkstemp is the most
477        secure way to do that. The directory should be created by
478        mkdtemp.  Return the filename.
479        """
480        if expcert and tmpdir:
481            try:
482                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
483                f = os.fdopen(certf, 'w')
484                print >> f, expcert
485                f.close()
486            except EnvironmentError, e:
487                raise service_error(service_error.internal, 
488                        "Cannot create temp cert file?")
489            return certfn
490        else:
491            return None
492
493       
494    def generate_ssh_keys(self, dest, type="rsa" ):
495        """
496        Generate a set of keys for the gateways to use to talk.
497
498        Keys are of type type and are stored in the required dest file.
499        """
500        valid_types = ("rsa", "dsa")
501        t = type.lower();
502        if t not in valid_types: raise ValueError
503        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
504
505        try:
506            trace = open("/dev/null", "w")
507        except EnvironmentError:
508            raise service_error(service_error.internal,
509                    "Cannot open /dev/null??");
510
511        # May raise CalledProcessError
512        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
513        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
514        if rv != 0:
515            raise service_error(service_error.internal, 
516                    "Cannot generate nonce ssh keys.  %s return code %d" \
517                            % (self.ssh_keygen, rv))
518
519    def generate_seer_certs(self, destdir):
520        '''
521        Create a SEER ca cert and a node cert in destdir/ca.pem and
522        destdir/node.pem respectively.  These will be distributed throughout
523        the federated experiment.  This routine reports errors via
524        service_errors.
525        '''
526        openssl = '/usr/bin/openssl'
527        # All the filenames and parameters we need for openssl calls below
528        ca_key =os.path.join(destdir, 'ca.key') 
529        ca_pem = os.path.join(destdir, 'ca.pem')
530        node_key =os.path.join(destdir, 'node.key') 
531        node_pem = os.path.join(destdir, 'node.pem')
532        node_req = os.path.join(destdir, 'node.req')
533        node_signed = os.path.join(destdir, 'node.signed')
534        days = '%s' % (365 * 10)
535        serial = '%s' % random.randint(0, 1<<16)
536
537        try:
538            # Sequence of calls to create a CA key, create a ca cert, create a
539            # node key, node signing request, and finally a signed node
540            # certificate.
541            sequence = (
542                    (openssl, 'genrsa', '-out', ca_key, '1024'),
543                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
544                        ca_pem, '-days', days, '-subj', 
545                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
546                    (openssl, 'genrsa', '-out', node_key, '1024'),
547                    (openssl, 'req', '-new', '-key', node_key, '-out', 
548                        node_req, '-days', days, '-subj', 
549                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
550                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
551                        '-set_serial', serial, '-req', '-in', node_req, 
552                        '-out', node_signed, '-days', days),
553                )
554            # Do all that stuff; bail if there's an error, and push all the
555            # output to dev/null.
556            for cmd in sequence:
557                trace = open("/dev/null", "w")
558                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
559                if rv != 0:
560                    raise service_error(service_error.internal, 
561                            "Cannot generate SEER certs.  %s return code %d" \
562                                    % (' '.join(cmd), rv))
563            # Concatinate the node key and signed certificate into node.pem
564            f = open(node_pem, 'w')
565            for comp in (node_signed, node_key):
566                g = open(comp, 'r')
567                f.write(g.read())
568                g.close()
569            f.close()
570
571            # Throw out intermediaries.
572            for fn in (ca_key, node_key, node_req, node_signed):
573                os.unlink(fn)
574
575        except EnvironmentError, e:
576            # Any difficulties with the file system wind up here
577            raise service_error(service_error.internal,
578                    "File error on  %s while creating SEER certs: %s" % \
579                            (e.filename, e.strerror))
580
581
582
583    def gentopo(self, str):
584        """
585        Generate the topology data structure from the splitter's XML
586        representation of it.
587
588        The topology XML looks like:
589            <experiment>
590                <nodes>
591                    <node><vname></vname><ips>ip1:ip2</ips></node>
592                </nodes>
593                <lans>
594                    <lan>
595                        <vname></vname><vnode></vnode><ip></ip>
596                        <bandwidth></bandwidth><member>node:port</member>
597                    </lan>
598                </lans>
599        """
600        class topo_parse:
601            """
602            Parse the topology XML and create the dats structure.
603            """
604            def __init__(self):
605                # Typing of the subelements for data conversion
606                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
607                self.int_subelements = ( 'bandwidth',)
608                self.float_subelements = ( 'delay',)
609                # The final data structure
610                self.nodes = [ ]
611                self.lans =  [ ]
612                self.topo = { \
613                        'node': self.nodes,\
614                        'lan' : self.lans,\
615                    }
616                self.element = { }  # Current element being created
617                self.chars = ""     # Last text seen
618
619            def end_element(self, name):
620                # After each sub element the contents is added to the current
621                # element or to the appropriate list.
622                if name == 'node':
623                    self.nodes.append(self.element)
624                    self.element = { }
625                elif name == 'lan':
626                    self.lans.append(self.element)
627                    self.element = { }
628                elif name in self.str_subelements:
629                    self.element[name] = self.chars
630                    self.chars = ""
631                elif name in self.int_subelements:
632                    self.element[name] = int(self.chars)
633                    self.chars = ""
634                elif name in self.float_subelements:
635                    self.element[name] = float(self.chars)
636                    self.chars = ""
637
638            def found_chars(self, data):
639                self.chars += data.rstrip()
640
641
642        tp = topo_parse();
643        parser = xml.parsers.expat.ParserCreate()
644        parser.EndElementHandler = tp.end_element
645        parser.CharacterDataHandler = tp.found_chars
646
647        parser.Parse(str)
648
649        return tp.topo
650       
651
652    def genviz(self, topo):
653        """
654        Generate the visualization the virtual topology
655        """
656
657        neato = "/usr/local/bin/neato"
658        # These are used to parse neato output and to create the visualization
659        # file.
660        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
661        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
662                "%s</type></node>"
663
664        try:
665            # Node names
666            nodes = [ n['vname'] for n in topo['node'] ]
667            topo_lans = topo['lan']
668        except KeyError, e:
669            raise service_error(service_error.internal, "Bad topology: %s" %e)
670
671        lans = { }
672        links = { }
673
674        # Walk through the virtual topology, organizing the connections into
675        # 2-node connections (links) and more-than-2-node connections (lans).
676        # When a lan is created, it's added to the list of nodes (there's a
677        # node in the visualization for the lan).
678        for l in topo_lans:
679            if links.has_key(l['vname']):
680                if len(links[l['vname']]) < 2:
681                    links[l['vname']].append(l['vnode'])
682                else:
683                    nodes.append(l['vname'])
684                    lans[l['vname']] = links[l['vname']]
685                    del links[l['vname']]
686                    lans[l['vname']].append(l['vnode'])
687            elif lans.has_key(l['vname']):
688                lans[l['vname']].append(l['vnode'])
689            else:
690                links[l['vname']] = [ l['vnode'] ]
691
692
693        # Open up a temporary file for dot to turn into a visualization
694        try:
695            df, dotname = tempfile.mkstemp()
696            dotfile = os.fdopen(df, 'w')
697        except EnvironmentError:
698            raise service_error(service_error.internal,
699                    "Failed to open file in genviz")
700
701        try:
702            dnull = open('/dev/null', 'w')
703        except EnvironmentError:
704            service_error(service_error.internal,
705                    "Failed to open /dev/null in genviz")
706
707        # Generate a dot/neato input file from the links, nodes and lans
708        try:
709            print >>dotfile, "graph G {"
710            for n in nodes:
711                print >>dotfile, '\t"%s"' % n
712            for l in links.keys():
713                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
714            for l in lans.keys():
715                for n in lans[l]:
716                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
717            print >>dotfile, "}"
718            dotfile.close()
719        except TypeError:
720            raise service_error(service_error.internal,
721                    "Single endpoint link in vtopo")
722        except EnvironmentError:
723            raise service_error(service_error.internal, "Cannot write dot file")
724
725        # Use dot to create a visualization
726        try:
727            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
728                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
729                stderr=dnull, close_fds=True)
730        except EnvironmentError:
731            raise service_error(service_error.internal, 
732                    "Cannot generate visualization: is graphviz available?")
733        dnull.close()
734
735        # Translate dot to vis format
736        vis_nodes = [ ]
737        vis = { 'node': vis_nodes }
738        for line in dot.stdout:
739            m = vis_re.match(line)
740            if m:
741                vn = m.group(1)
742                vis_node = {'name': vn, \
743                        'x': float(m.group(2)),\
744                        'y' : float(m.group(3)),\
745                    }
746                if vn in links.keys() or vn in lans.keys():
747                    vis_node['type'] = 'lan'
748                else:
749                    vis_node['type'] = 'node'
750                vis_nodes.append(vis_node)
751        rv = dot.wait()
752
753        os.remove(dotname)
754        # XXX: graphviz seems to use low return codes for warnings, like
755        # "couldn't find font"
756        if rv < 2 : return vis
757        else: return None
758
759
760    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
761            cert_pwd=None):
762        """
763        Release access to testbed through fedd
764        """
765
766        if not uri and tbmap:
767            uri = tbmap.get(tb, None)
768        if not uri:
769            raise service_error(service_error.server_config, 
770                    "Unknown testbed: %s" % tb)
771
772        if self.local_access.has_key(uri):
773            resp = self.local_access[uri].ReleaseAccess(\
774                    { 'ReleaseAccessRequestBody' : 
775                        {'allocID': {'fedid': aid}},}, 
776                    fedid(file=cert_file))
777            resp = { 'ReleaseAccessResponseBody': resp } 
778        else:
779            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
780                    cert_file, cert_pwd, self.trusted_certs)
781
782        # better error coding
783
784    def remote_ns2topdl(self, uri, desc):
785
786        req = {
787                'description' : { 'ns2description': desc },
788            }
789
790        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
791                self.trusted_certs)
792
793        if r.has_key('Ns2TopdlResponseBody'):
794            r = r['Ns2TopdlResponseBody']
795            ed = r.get('experimentdescription', None)
796            if ed.has_key('topdldescription'):
797                return topdl.Topology(**ed['topdldescription'])
798            else:
799                raise service_error(service_error.protocol, 
800                        "Bad splitter response (no output)")
801        else:
802            raise service_error(service_error.protocol, "Bad splitter response")
803
804    class start_segment:
805        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
806                cert_pwd=None, trusted_certs=None, caller=None,
807                log_collector=None):
808            self.log = log
809            self.debug = debug
810            self.cert_file = cert_file
811            self.cert_pwd = cert_pwd
812            self.trusted_certs = None
813            self.caller = caller
814            self.testbed = testbed
815            self.log_collector = log_collector
816            self.response = None
817            self.node = { }
818            self.proof = None
819
820        def make_map(self, resp):
821            if 'segmentdescription' not in resp  or \
822                    'topdldescription' not in resp['segmentdescription']:
823                self.log.warn('No topology returned from startsegment')
824                return 
825
826            top = topdl.Topology(
827                    **resp['segmentdescription']['topdldescription'])
828
829            for e in top.elements:
830                if isinstance(e, topdl.Computer):
831                    self.node[e.name] = e
832
833        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
834            req = {
835                    'allocID': { 'fedid' : aid }, 
836                    'segmentdescription': { 
837                        'topdldescription': topo.to_dict(),
838                    },
839                }
840
841            if connInfo:
842                req['connection'] = connInfo
843
844            import_svcs = [ s for m in masters.values() \
845                    for s in m if self.testbed in s.importers]
846
847            if import_svcs or self.testbed in masters:
848                req['service'] = []
849
850            for s in import_svcs:
851                for r in s.reqs:
852                    sr = copy.deepcopy(r)
853                    sr['visibility'] = 'import';
854                    req['service'].append(sr)
855
856            for s in masters.get(self.testbed, []):
857                for r in s.reqs:
858                    sr = copy.deepcopy(r)
859                    sr['visibility'] = 'export';
860                    req['service'].append(sr)
861
862            if attrs:
863                req['fedAttr'] = attrs
864
865            try:
866                self.log.debug("Calling StartSegment at %s " % uri)
867                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
868                        self.trusted_certs)
869                if r.has_key('StartSegmentResponseBody'):
870                    lval = r['StartSegmentResponseBody'].get('allocationLog',
871                            None)
872                    if lval and self.log_collector:
873                        for line in  lval.splitlines(True):
874                            self.log_collector.write(line)
875                    self.make_map(r['StartSegmentResponseBody'])
876                    if 'proof' in r: self.proof = r['proof']
877                    self.response = r
878                else:
879                    raise service_error(service_error.internal, 
880                            "Bad response!?: %s" %r)
881                return True
882            except service_error, e:
883                self.log.error("Start segment failed on %s: %s" % \
884                        (self.testbed, e))
885                return False
886
887
888
889    class terminate_segment:
890        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
891                cert_pwd=None, trusted_certs=None, caller=None):
892            self.log = log
893            self.debug = debug
894            self.cert_file = cert_file
895            self.cert_pwd = cert_pwd
896            self.trusted_certs = None
897            self.caller = caller
898            self.testbed = testbed
899
900        def __call__(self, uri, aid ):
901            req = {
902                    'allocID': {'fedid': aid }, 
903                }
904            self.log.info("Calling terminate segment")
905            try:
906                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
907                        self.trusted_certs)
908                self.log.info("Terminate segment succeeded")
909                return True
910            except service_error, e:
911                self.log.error("Terminate segment failed on %s: %s" % \
912                        (self.testbed, e))
913                return False
914
915    class info_segment(start_segment):
916        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
917                cert_pwd=None, trusted_certs=None, caller=None,
918                log_collector=None):
919            experiment_control_local.start_segment.__init__(self, debug, 
920                    log, testbed, cert_file, cert_pwd, trusted_certs, 
921                    caller, log_collector)
922
923        def __call__(self, uri, aid):
924            req = { 'allocID': { 'fedid' : aid } }
925
926            try:
927                self.log.debug("Calling InfoSegment at %s " % uri)
928                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
929                        self.trusted_certs)
930                if r.has_key('InfoSegmentResponseBody'):
931                    self.make_map(r['InfoSegmentResponseBody'])
932                    if 'proof' in r: self.proof = r['proof']
933                    self.response = r
934                else:
935                    raise service_error(service_error.internal, 
936                            "Bad response!?: %s" %r)
937                return True
938            except service_error, e:
939                self.log.error("Info segment failed on %s: %s" % \
940                        (self.testbed, e))
941                return False
942
943    class operation_segment:
944        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
945                cert_pwd=None, trusted_certs=None, caller=None,
946                log_collector=None):
947            self.log = log
948            self.debug = debug
949            self.cert_file = cert_file
950            self.cert_pwd = cert_pwd
951            self.trusted_certs = None
952            self.caller = caller
953            self.testbed = testbed
954            self.status = None
955
956        def __call__(self, uri, aid, op, targets, params):
957            req = { 
958                    'allocID': { 'fedid' : aid },
959                    'operation': op,
960                    'target': targets,
961                    }
962            if params: req['parameter'] = params
963
964
965            try:
966                self.log.debug("Calling OperationSegment at %s " % uri)
967                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
968                        self.trusted_certs)
969                if 'OperationSegmentResponseBody' in r:
970                    r = r['OperationSegmentResponseBody']
971                    if 'status' in r:
972                        self.status = r['status']
973                else:
974                    raise service_error(service_error.internal, 
975                            "Bad response!?: %s" %r)
976                return True
977            except service_error, e:
978                self.log.error("Operation segment failed on %s: %s" % \
979                        (self.testbed, e))
980                return False
981
982    def annotate_topology(self, top, data):
983        # These routines do various parts of the annotation
984        def add_new_names(nl, l):
985            """ add any names in nl to the list in l """
986            for n in nl:
987                if n not in l: l.append(n)
988       
989        def merge_services(ne, e):
990            for ns in ne.service:
991                # NB: the else is on the for
992                for s in e.service:
993                    if ns.name == s.name:
994                        s.importer = ns.importer
995                        s.param = ns.param
996                        s.description = ns.description
997                        s.status = ns.status
998                        break
999                else:
1000                    e.service.append(ns)
1001       
1002        def merge_oses(ne, e):
1003            """
1004            Merge the operating system entries of ne into e
1005            """
1006            for nos in ne.os:
1007                # NB: the else is on the for
1008                for os in e.os:
1009                    if nos.name == os.name:
1010                        os.version = nos.version
1011                        os.version = nos.distribution
1012                        os.version = nos.distributionversion
1013                        for a in nos.attribute:
1014                            if os.get_attribute(a.attribute):
1015                                os.remove_attribute(a.attribute)
1016                            os.set_attribute(a.attribute, a.value)
1017                        break
1018                else:
1019                    # If both nodes have one OS, this is a replacement
1020                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
1021                    else: e.os.append(nos)
1022
1023        # Annotate the topology with embedding info
1024        for e in top.elements:
1025            if isinstance(e, topdl.Computer):
1026                for s in data:
1027                    ne = s.node.get(e.name, None)
1028                    if ne is not None:
1029                        add_new_names(ne.localname, e.localname)
1030                        e.status = ne.status
1031                        merge_services(ne, e)
1032                        add_new_names(ne.operation, e.operation)
1033                        if ne.os: merge_oses(ne, e)
1034                        break
1035
1036
1037
1038    def allocate_resources(self, allocated, masters, eid, expid, 
1039            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1040            attrs=None, connInfo={}, tbmap=None, expcert=None):
1041
1042        started = { }           # Testbeds where a sub-experiment started
1043                                # successfully
1044
1045        # XXX
1046        fail_soft = False
1047
1048        if tbmap is None: tbmap = { }
1049
1050        log = alloc_log or self.log
1051
1052        tp = thread_pool(self.nthreads)
1053        threads = [ ]
1054        starters = [ ]
1055
1056        if expcert:
1057            cert = expcert
1058            pw = None
1059        else:
1060            cert = self.cert_file
1061            pw = self.cert_pwd
1062
1063        for tb in allocated.keys():
1064            # Create and start a thread to start the segment, and save it
1065            # to get the return value later
1066            tb_attrs = copy.copy(attrs)
1067            tp.wait_for_slot()
1068            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1069            base, suffix = split_testbed(tb)
1070            if suffix:
1071                tb_attrs.append({'attribute': 'experiment_name', 
1072                    'value': "%s-%s" % (eid, suffix)})
1073            else:
1074                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1075            if not uri:
1076                raise service_error(service_error.internal, 
1077                        "Unknown testbed %s !?" % tb)
1078
1079            aid = tbparams[tb].allocID
1080            if not aid:
1081                raise service_error(service_error.internal, 
1082                        "No alloc id for testbed %s !?" % tb)
1083
1084            s = self.start_segment(log=log, debug=self.debug,
1085                    testbed=tb, cert_file=cert,
1086                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1087                    caller=self.call_StartSegment,
1088                    log_collector=log_collector)
1089            starters.append(s)
1090            t  = pooled_thread(\
1091                    target=s, name=tb,
1092                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1093                    pdata=tp, trace_file=self.trace_file)
1094            threads.append(t)
1095            t.start()
1096
1097        # Wait until all finish (keep pinging the log, though)
1098        mins = 0
1099        revoked = False
1100        while not tp.wait_for_all_done(60.0):
1101            mins += 1
1102            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1103                    % mins)
1104            if not revoked and \
1105                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1106                # a testbed has failed.  Revoke this experiment's
1107                # synchronizarion values so that sub experiments will not
1108                # deadlock waiting for synchronization that will never happen
1109                self.log.info("A subexperiment has failed to swap in, " + \
1110                        "revoking synch keys")
1111                var_key = "fedid:%s" % expid
1112                for k in self.synch_store.all_keys():
1113                    if len(k) > 45 and k[0:46] == var_key:
1114                        self.synch_store.revoke_key(k)
1115                revoked = True
1116
1117        failed = [ t.getName() for t in threads if not t.rv ]
1118        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1119
1120        # If one failed clean up, unless fail_soft is set
1121        if failed:
1122            if not fail_soft:
1123                tp.clear()
1124                for tb in succeeded:
1125                    # Create and start a thread to stop the segment
1126                    tp.wait_for_slot()
1127                    uri = tbparams[tb].uri
1128                    t  = pooled_thread(\
1129                            target=self.terminate_segment(log=log,
1130                                testbed=tb,
1131                                cert_file=cert, 
1132                                cert_pwd=pw,
1133                                trusted_certs=self.trusted_certs,
1134                                caller=self.call_TerminateSegment),
1135                            args=(uri, tbparams[tb].allocID),
1136                            name=tb,
1137                            pdata=tp, trace_file=self.trace_file)
1138                    t.start()
1139                # Wait until all finish (if any are being stopped)
1140                if succeeded:
1141                    tp.wait_for_all_done()
1142
1143                # release the allocations
1144                for tb in tbparams.keys():
1145                    try:
1146                        self.release_access(tb, tbparams[tb].allocID, 
1147                                tbmap=tbmap, uri=tbparams[tb].uri,
1148                                cert_file=cert, cert_pwd=pw)
1149                    except service_error, e:
1150                        self.log.warn("Error releasing access: %s" % e.desc)
1151                # Remove the placeholder
1152                self.state_lock.acquire()
1153                self.state[eid].status = 'failed'
1154                self.state[eid].updated()
1155                if self.state_filename: self.write_state()
1156                self.state_lock.release()
1157                # Remove the repo dir
1158                self.remove_dirs("%s/%s" %(self.repodir, expid))
1159                # Walk up tmpdir, deleting as we go
1160                if self.cleanup:
1161                    self.remove_dirs(tmpdir)
1162                else:
1163                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1164
1165
1166                log.error("Swap in failed on %s" % ",".join(failed))
1167                return
1168        else:
1169            # Walk through the successes and gather the proofs
1170            proofs = { }
1171            for s in starters:
1172                if s.proof:
1173                    proofs[s.testbed] = s.proof
1174            self.annotate_topology(top, starters)
1175            log.info("[start_segment]: Experiment %s active" % eid)
1176
1177
1178        # Walk up tmpdir, deleting as we go
1179        if self.cleanup:
1180            self.remove_dirs(tmpdir)
1181        else:
1182            log.debug("[start_experiment]: not removing %s" % tmpdir)
1183
1184        # Insert the experiment into our state and update the disk copy.
1185        self.state_lock.acquire()
1186        self.state[expid].status = 'active'
1187        self.state[eid] = self.state[expid]
1188        self.state[eid].top = top
1189        self.state[eid].updated()
1190        # Append startup proofs
1191        for f in self.state[eid].get_all_allocations():
1192            if f.tb in proofs:
1193                f.proof.append(proofs[f.tb])
1194
1195        if self.state_filename: self.write_state()
1196        self.state_lock.release()
1197        return
1198
1199
1200    def add_kit(self, e, kit):
1201        """
1202        Add a Software object created from the list of (install, location)
1203        tuples passed as kit  to the software attribute of an object e.  We
1204        do this enough to break out the code, but it's kind of a hack to
1205        avoid changing the old tuple rep.
1206        """
1207
1208        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1209
1210        if isinstance(e.software, list): e.software.extend(s)
1211        else: e.software = s
1212
1213    def append_experiment_authorization(self, expid, attrs, 
1214            need_state_lock=True):
1215        """
1216        Append the authorization information to system state
1217        """
1218
1219        for p, a in attrs:
1220            self.auth.set_attribute(p, a)
1221        self.auth.save()
1222
1223        if need_state_lock: self.state_lock.acquire()
1224        # XXX: really a no op?
1225        #self.state[expid]['auth'].update(attrs)
1226        if self.state_filename: self.write_state()
1227        if need_state_lock: self.state_lock.release()
1228
1229    def clear_experiment_authorization(self, expid, need_state_lock=True):
1230        """
1231        Attrs is a set of attribute principal pairs that need to be removed
1232        from the authenticator.  Remove them and save the authenticator.
1233        """
1234
1235        if need_state_lock: self.state_lock.acquire()
1236        # XXX: should be a no-op
1237        #if expid in self.state and 'auth' in self.state[expid]:
1238            #for p, a in self.state[expid]['auth']:
1239                #self.auth.unset_attribute(p, a)
1240            #self.state[expid]['auth'] = set()
1241        if self.state_filename: self.write_state()
1242        if need_state_lock: self.state_lock.release()
1243        self.auth.save()
1244
1245
1246    def create_experiment_state(self, fid, req, expid, expcert,
1247            state='starting'):
1248        """
1249        Create the initial entry in the experiment's state.  The expid and
1250        expcert are the experiment's fedid and certifacte that represents that
1251        ID, which are installed in the experiment state.  If the request
1252        includes a suggested local name that is used if possible.  If the local
1253        name is already taken by an experiment owned by this user that has
1254        failed, it is overwritten.  Otherwise new letters are added until a
1255        valid localname is found.  The generated local name is returned.
1256        """
1257
1258        if req.has_key('experimentID') and \
1259                req['experimentID'].has_key('localname'):
1260            overwrite = False
1261            eid = req['experimentID']['localname']
1262            # If there's an old failed experiment here with the same local name
1263            # and accessible by this user, we'll overwrite it, otherwise we'll
1264            # fall through and do the collision avoidance.
1265            old_expid = self.get_experiment_fedid(eid)
1266            if old_expid:
1267                users_experiment = True
1268                try:
1269                    self.check_experiment_access(fid, old_expid)
1270                except service_error, e:
1271                    if e.code == service_error.access: users_experiment = False
1272                    else: raise e
1273                if users_experiment:
1274                    self.state_lock.acquire()
1275                    status = self.state[eid].status
1276                    if status and status == 'failed':
1277                        # remove the old access attributes
1278                        self.clear_experiment_authorization(eid,
1279                                need_state_lock=False)
1280                        overwrite = True
1281                        del self.state[eid]
1282                        del self.state[old_expid]
1283                    self.state_lock.release()
1284                else:
1285                    self.log.info('Experiment %s exists, ' % eid + \
1286                            'but this user cannot access it')
1287            self.state_lock.acquire()
1288            while (self.state.has_key(eid) and not overwrite):
1289                eid += random.choice(string.ascii_letters)
1290            # Initial state
1291            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1292                    identity=expcert)
1293            self.state[expid] = self.state[eid]
1294            if self.state_filename: self.write_state()
1295            self.state_lock.release()
1296        else:
1297            eid = self.exp_stem
1298            for i in range(0,5):
1299                eid += random.choice(string.ascii_letters)
1300            self.state_lock.acquire()
1301            while (self.state.has_key(eid)):
1302                eid = self.exp_stem
1303                for i in range(0,5):
1304                    eid += random.choice(string.ascii_letters)
1305            # Initial state
1306            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1307                    identity=expcert)
1308            self.state[expid] = self.state[eid]
1309            if self.state_filename: self.write_state()
1310            self.state_lock.release()
1311
1312        # Let users touch the state.  Authorize this fid and the expid itself
1313        # to touch the experiment, as well as allowing th eoverrides.
1314        self.append_experiment_authorization(eid, 
1315                set([(fid, expid), (expid,expid)] + \
1316                        [ (o, expid) for o in self.overrides]))
1317
1318        return eid
1319
1320
1321    def allocate_ips_to_topo(self, top):
1322        """
1323        Add an ip4_address attribute to all the hosts in the topology, based on
1324        the shared substrates on which they sit.  An /etc/hosts file is also
1325        created and returned as a list of hostfiles entries.  We also return
1326        the allocator, because we may need to allocate IPs to portals
1327        (specifically DRAGON portals).
1328        """
1329        subs = sorted(top.substrates, 
1330                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1331                reverse=True)
1332        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1333        ifs = { }
1334        hosts = [ ]
1335
1336        for idx, s in enumerate(subs):
1337            net_size = len(s.interfaces)+2
1338
1339            a = ips.allocate(net_size)
1340            if a :
1341                base, num = a
1342                if num < net_size: 
1343                    raise service_error(service_error.internal,
1344                            "Allocator returned wrong number of IPs??")
1345            else:
1346                raise service_error(service_error.req, 
1347                        "Cannot allocate IP addresses")
1348            mask = ips.min_alloc
1349            while mask < net_size:
1350                mask *= 2
1351
1352            netmask = ((2**32-1) ^ (mask-1))
1353
1354            base += 1
1355            for i in s.interfaces:
1356                i.attribute.append(
1357                        topdl.Attribute('ip4_address', 
1358                            "%s" % ip_addr(base)))
1359                i.attribute.append(
1360                        topdl.Attribute('ip4_netmask', 
1361                            "%s" % ip_addr(int(netmask))))
1362
1363                hname = i.element.name
1364                if ifs.has_key(hname):
1365                    hosts.append("%s\t%s-%s %s-%d" % \
1366                            (ip_addr(base), hname, s.name, hname,
1367                                ifs[hname]))
1368                else:
1369                    ifs[hname] = 0
1370                    hosts.append("%s\t%s-%s %s-%d %s" % \
1371                            (ip_addr(base), hname, s.name, hname,
1372                                ifs[hname], hname))
1373
1374                ifs[hname] += 1
1375                base += 1
1376        return hosts, ips
1377
1378    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1379            tbparam, masters, tbmap, expid=None, expcert=None):
1380        for tb in testbeds:
1381            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1382                    expcert)
1383            allocated[tb] = 1
1384
1385    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1386            expcert=None):
1387        """
1388        Get access to testbed through fedd and set the parameters for that tb
1389        """
1390        def get_export_project(svcs):
1391            """
1392            Look through for the list of federated_service for this testbed
1393            objects for a project_export service, and extract the project
1394            parameter.
1395            """
1396
1397            pe = [s for s in svcs if s.name=='project_export']
1398            if len(pe) == 1:
1399                return pe[0].params.get('project', None)
1400            elif len(pe) == 0:
1401                return None
1402            else:
1403                raise service_error(service_error.req,
1404                        "More than one project export is not supported")
1405
1406        def add_services(svcs, type, slist, keys):
1407            """
1408            Add the given services to slist.  type is import or export.  Also
1409            add a mapping entry from the assigned id to the original service
1410            record.
1411            """
1412            for i, s in enumerate(svcs):
1413                idx = '%s%d' % (type, i)
1414                keys[idx] = s
1415                sr = {'id': idx, 'name': s.name, 'visibility': type }
1416                if s.params:
1417                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1418                            for k, v in s.params.items()]
1419                slist.append(sr)
1420
1421        uri = tbmap.get(testbed_base(tb), None)
1422        if not uri:
1423            raise service_error(service_error.server_config, 
1424                    "Unknown testbed: %s" % tb)
1425
1426        export_svcs = masters.get(tb,[])
1427        import_svcs = [ s for m in masters.values() \
1428                for s in m \
1429                    if tb in s.importers ]
1430
1431        export_project = get_export_project(export_svcs)
1432        # Compose the credential list so that IDs come before attributes
1433        creds = set()
1434        keys = set()
1435        certs = self.auth.get_creds_for_principal(fid)
1436        if expid:
1437            certs.update(self.auth.get_creds_for_principal(expid))
1438        for c in certs:
1439            keys.add(c.issuer_cert())
1440            creds.add(c.attribute_cert())
1441        creds = list(keys) + list(creds)
1442
1443        if expcert: cert, pw = expcert, None
1444        else: cert, pw = self.cert_file, self.cert_pw
1445
1446        # Request credentials
1447        req = {
1448                'abac_credential': creds,
1449            }
1450        # Make the service request from the services we're importing and
1451        # exporting.  Keep track of the export request ids so we can
1452        # collect the resulting info from the access response.
1453        e_keys = { }
1454        if import_svcs or export_svcs:
1455            slist = []
1456            add_services(import_svcs, 'import', slist, e_keys)
1457            add_services(export_svcs, 'export', slist, e_keys)
1458            req['service'] = slist
1459
1460        if self.local_access.has_key(uri):
1461            # Local access call
1462            req = { 'RequestAccessRequestBody' : req }
1463            r = self.local_access[uri].RequestAccess(req, 
1464                    fedid(file=self.cert_file))
1465            r = { 'RequestAccessResponseBody' : r }
1466        else:
1467            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1468
1469        if r.has_key('RequestAccessResponseBody'):
1470            # Through to here we have a valid response, not a fault.
1471            # Access denied is a fault, so something better or worse than
1472            # access denied has happened.
1473            r = r['RequestAccessResponseBody']
1474            self.log.debug("[get_access] Access granted")
1475        else:
1476            raise service_error(service_error.protocol,
1477                        "Bad proxy response")
1478        if 'proof' not in r:
1479            raise service_error(service_error.protocol,
1480                        "Bad access response (no access proof)")
1481
1482        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1483                tb=tb, uri=uri, proof=[r['proof']], 
1484                services=masters.get(tb, None))
1485
1486        # Collect the responses corresponding to the services this testbed
1487        # exports.  These will be the service requests that we will include in
1488        # the start segment requests (with appropriate visibility values) to
1489        # import and export the segments.
1490        for s in r.get('service', []):
1491            id = s.get('id', None)
1492            # Note that this attaches the response to the object in the masters
1493            # data structure.  (The e_keys index disappears when this fcn
1494            # returns)
1495            if id and id in e_keys:
1496                e_keys[id].reqs.append(s)
1497
1498        # Add attributes to parameter space.  We don't allow attributes to
1499        # overlay any parameters already installed.
1500        for a in r.get('fedAttr', []):
1501            try:
1502                if a['attribute']:
1503                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1504            except KeyError:
1505                self.log.error("Bad attribute in response: %s" % a)
1506
1507
1508    def split_topology(self, top, topo, testbeds):
1509        """
1510        Create the sub-topologies that are needed for experiment instantiation.
1511        """
1512        for tb in testbeds:
1513            topo[tb] = top.clone()
1514            # copy in for loop allows deletions from the original
1515            for e in [ e for e in topo[tb].elements]:
1516                etb = e.get_attribute('testbed')
1517                # NB: elements without a testbed attribute won't appear in any
1518                # sub topologies. 
1519                if not etb or etb != tb:
1520                    for i in e.interface:
1521                        for s in i.subs:
1522                            try:
1523                                s.interfaces.remove(i)
1524                            except ValueError:
1525                                raise service_error(service_error.internal,
1526                                        "Can't remove interface??")
1527                    topo[tb].elements.remove(e)
1528            topo[tb].make_indices()
1529
1530    def confirm_software(self, top):
1531        """
1532        Make sure that the software to be loaded in the topo is all available
1533        before we begin making access requests, etc.  This is a subset of
1534        wrangle_software.
1535        """
1536        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1537        pkgs.update([x.location for e in top.elements for x in e.software])
1538
1539        for pkg in pkgs:
1540            loc = pkg
1541
1542            scheme, host, path = urlparse(loc)[0:3]
1543            dest = os.path.basename(path)
1544            if not scheme:
1545                if not loc.startswith('/'):
1546                    loc = "/%s" % loc
1547                loc = "file://%s" %loc
1548            # NB: if scheme was found, loc == pkg
1549            try:
1550                u = urlopen(loc)
1551                u.close()
1552            except Exception, e:
1553                raise service_error(service_error.req, 
1554                        "Cannot open %s: %s" % (loc, e))
1555        return True
1556
1557    def wrangle_software(self, expid, top, topo, tbparams):
1558        """
1559        Copy software out to the repository directory, allocate permissions and
1560        rewrite the segment topologies to look for the software in local
1561        places.
1562        """
1563
1564        # Copy the rpms and tarfiles to a distribution directory from
1565        # which the federants can retrieve them
1566        linkpath = "%s/software" %  expid
1567        softdir ="%s/%s" % ( self.repodir, linkpath)
1568        softmap = { }
1569
1570        # self.fedkit and self.gateway kit are lists of tuples of
1571        # (install_location, download_location) this extracts the download
1572        # locations.
1573        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1574        pkgs.update([x.location for e in top.elements for x in e.software])
1575        try:
1576            os.makedirs(softdir)
1577        except EnvironmentError, e:
1578            raise service_error(
1579                    "Cannot create software directory: %s" % e)
1580        # The actual copying.  Everything's converted into a url for copying.
1581        auth_attrs = set()
1582        for pkg in pkgs:
1583            loc = pkg
1584
1585            scheme, host, path = urlparse(loc)[0:3]
1586            dest = os.path.basename(path)
1587            if not scheme:
1588                if not loc.startswith('/'):
1589                    loc = "/%s" % loc
1590                loc = "file://%s" %loc
1591            # NB: if scheme was found, loc == pkg
1592            try:
1593                u = urlopen(loc)
1594            except Exception, e:
1595                raise service_error(service_error.req, 
1596                        "Cannot open %s: %s" % (loc, e))
1597            try:
1598                f = open("%s/%s" % (softdir, dest) , "w")
1599                self.log.debug("Writing %s/%s" % (softdir,dest) )
1600                data = u.read(4096)
1601                while data:
1602                    f.write(data)
1603                    data = u.read(4096)
1604                f.close()
1605                u.close()
1606            except Exception, e:
1607                raise service_error(service_error.internal,
1608                        "Could not copy %s: %s" % (loc, e))
1609            path = re.sub("/tmp", "", linkpath)
1610            # XXX
1611            softmap[pkg] = \
1612                    "%s/%s/%s" %\
1613                    ( self.repo_url, path, dest)
1614
1615            # Allow the individual segments to access the software by assigning
1616            # an attribute to each testbed allocation that encodes the data to
1617            # be released.  This expression collects the data for each run of
1618            # the loop.
1619            auth_attrs.update([
1620                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1621                        for tb in tbparams.keys()])
1622
1623        self.append_experiment_authorization(expid, auth_attrs)
1624
1625        # Convert the software locations in the segments into the local
1626        # copies on this host
1627        for soft in [ s for tb in topo.values() \
1628                for e in tb.elements \
1629                    if getattr(e, 'software', False) \
1630                        for s in e.software ]:
1631            if softmap.has_key(soft.location):
1632                soft.location = softmap[soft.location]
1633
1634
1635    def new_experiment(self, req, fid):
1636        """
1637        The external interface to empty initial experiment creation called from
1638        the dispatcher.
1639
1640        Creates a working directory, splits the incoming description using the
1641        splitter script and parses out the avrious subsections using the
1642        lcasses above.  Once each sub-experiment is created, use pooled threads
1643        to instantiate them and start it all up.
1644        """
1645        req = req.get('NewRequestBody', None)
1646        if not req:
1647            raise service_error(service_error.req,
1648                    "Bad request format (no NewRequestBody)")
1649
1650        if self.auth.import_credentials(data_list=req.get('credential', [])):
1651            self.auth.save()
1652
1653        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1654                with_proof=True)
1655
1656        if not access_ok:
1657            raise service_error(service_error.access, "New access denied",
1658                    proof=[proof])
1659
1660        try:
1661            tmpdir = tempfile.mkdtemp(prefix="split-")
1662        except EnvironmentError:
1663            raise service_error(service_error.internal, "Cannot create tmp dir")
1664
1665        try:
1666            access_user = self.accessdb[fid]
1667        except KeyError:
1668            raise service_error(service_error.internal,
1669                    "Access map and authorizer out of sync in " + \
1670                            "new_experiment for fedid %s"  % fid)
1671
1672        # Generate an ID for the experiment (slice) and a certificate that the
1673        # allocator can use to prove they own it.  We'll ship it back through
1674        # the encrypted connection.  If the requester supplied one, use it.
1675        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1676            expcert = req['experimentAccess']['X509']
1677            expid = fedid(certstr=expcert)
1678            self.state_lock.acquire()
1679            if expid in self.state:
1680                self.state_lock.release()
1681                raise service_error(service_error.req, 
1682                        'fedid %s identifies an existing experiment' % expid)
1683            self.state_lock.release()
1684        else:
1685            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1686
1687        #now we're done with the tmpdir, and it should be empty
1688        if self.cleanup:
1689            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1690            os.rmdir(tmpdir)
1691        else:
1692            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1693
1694        eid = self.create_experiment_state(fid, req, expid, expcert, 
1695                state='empty')
1696
1697        rv = {
1698                'experimentID': [
1699                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1700                ],
1701                'experimentStatus': 'empty',
1702                'experimentAccess': { 'X509' : expcert },
1703                'proof': proof.to_dict(),
1704            }
1705
1706        return rv
1707
1708    # create_experiment sub-functions
1709
1710    @staticmethod
1711    def get_experiment_key(req, field='experimentID'):
1712        """
1713        Parse the experiment identifiers out of the request (the request body
1714        tag has been removed).  Specifically this pulls either the fedid or the
1715        localname out of the experimentID field.  A fedid is preferred.  If
1716        neither is present or the request does not contain the fields,
1717        service_errors are raised.
1718        """
1719        # Get the experiment access
1720        exp = req.get(field, None)
1721        if exp:
1722            if exp.has_key('fedid'):
1723                key = exp['fedid']
1724            elif exp.has_key('localname'):
1725                key = exp['localname']
1726            else:
1727                raise service_error(service_error.req, "Unknown lookup type")
1728        else:
1729            raise service_error(service_error.req, "No request?")
1730
1731        return key
1732
1733    def get_experiment_ids_and_start(self, key, tmpdir):
1734        """
1735        Get the experiment name, id and access certificate from the state, and
1736        set the experiment state to 'starting'.  returns a triple (fedid,
1737        localname, access_cert_file). The access_cert_file is a copy of the
1738        contents of the access certificate, created in the tempdir with
1739        restricted permissions.  If things are confused, raise an exception.
1740        """
1741
1742        expid = eid = None
1743        self.state_lock.acquire()
1744        if key in self.state:
1745            exp = self.state[key]
1746            exp.status = "starting"
1747            exp.updated()
1748            expid = exp.fedid
1749            eid = exp.localname
1750            expcert = exp.identity
1751        self.state_lock.release()
1752
1753        # make a protected copy of the access certificate so the experiment
1754        # controller can act as the experiment principal.
1755        if expcert:
1756            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1757            if not expcert_file:
1758                raise service_error(service_error.internal, 
1759                        "Cannot create temp cert file?")
1760        else:
1761            expcert_file = None
1762
1763        return (eid, expid, expcert_file)
1764
1765    def get_topology(self, req, tmpdir):
1766        """
1767        Get the ns2 content and put it into a file for parsing.  Call the local
1768        or remote parser and return the topdl.Topology.  Errors result in
1769        exceptions.  req is the request and tmpdir is a work directory.
1770        """
1771
1772        # The tcl parser needs to read a file so put the content into that file
1773        descr=req.get('experimentdescription', None)
1774        if descr:
1775            if 'ns2description' in descr:
1776                file_content=descr['ns2description']
1777            elif 'topdldescription' in descr:
1778                return topdl.Topology(**descr['topdldescription'])
1779            else:
1780                raise service_error(service_error.req, 
1781                        'Unknown experiment description type')
1782        else:
1783            raise service_error(service_error.req, "No experiment description")
1784
1785
1786        if self.splitter_url:
1787            self.log.debug("Calling remote topdl translator at %s" % \
1788                    self.splitter_url)
1789            top = self.remote_ns2topdl(self.splitter_url, file_content)
1790        else:
1791            tclfile = os.path.join(tmpdir, "experiment.tcl")
1792            if file_content:
1793                try:
1794                    f = open(tclfile, 'w')
1795                    f.write(file_content)
1796                    f.close()
1797                except EnvironmentError:
1798                    raise service_error(service_error.internal,
1799                            "Cannot write temp experiment description")
1800            else:
1801                raise service_error(service_error.req, 
1802                        "Only ns2descriptions supported")
1803            pid = "dummy"
1804            gid = "dummy"
1805            eid = "dummy"
1806
1807            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1808                str(self.muxmax), '-m', 'dummy']
1809
1810            tclcmd.extend([pid, gid, eid, tclfile])
1811
1812            self.log.debug("running local splitter %s", " ".join(tclcmd))
1813            # This is just fantastic.  As a side effect the parser copies
1814            # tb_compat.tcl into the current directory, so that directory
1815            # must be writable by the fedd user.  Doing this in the
1816            # temporary subdir ensures this is the case.
1817            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1818                    cwd=tmpdir)
1819            split_data = tclparser.stdout
1820
1821            top = topdl.topology_from_xml(file=split_data, top="experiment")
1822            os.remove(tclfile)
1823
1824        return top
1825
1826    def get_testbed_services(self, req, testbeds):
1827        """
1828        Parse the services section of the request into two dicts mapping
1829        testbed to lists of federated_service objects.  The first dict maps all
1830        exporters of services to those service objects, the second maps
1831        testbeds to service objects only for services requiring portals.
1832        """
1833        # We construct both dicts here because deriving the second is more
1834        # comples than it looks - both the keys and lists can differ, and it's
1835        # much easier to generate both in one pass.
1836        masters = { }
1837        pmasters = { }
1838        for s in req.get('service', []):
1839            # If this is a service request with the importall field
1840            # set, fill it out.
1841
1842            if s.get('importall', False):
1843                s['import'] = [ tb for tb in testbeds \
1844                        if tb not in s.get('export',[])]
1845                del s['importall']
1846
1847            # Add the service to masters
1848            for tb in s.get('export', []):
1849                if s.get('name', None):
1850
1851                    params = { }
1852                    for a in s.get('fedAttr', []):
1853                        params[a.get('attribute', '')] = a.get('value','')
1854
1855                    fser = federated_service(name=s['name'],
1856                            exporter=tb, importers=s.get('import',[]),
1857                            params=params)
1858                    if fser.name == 'hide_hosts' \
1859                            and 'hosts' not in fser.params:
1860                        fser.params['hosts'] = \
1861                                ",".join(tb_hosts.get(fser.exporter, []))
1862                    if tb in masters: masters[tb].append(fser)
1863                    else: masters[tb] = [fser]
1864
1865                    if fser.portal:
1866                        if tb in pmasters: pmasters[tb].append(fser)
1867                        else: pmasters[tb] = [fser]
1868                else:
1869                    self.log.error('Testbed service does not have name " + \
1870                            "and importers')
1871        return masters, pmasters
1872
1873    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1874        """
1875        Create the ssh keys necessary for interconnecting the portal nodes and
1876        the global hosts file for letting each segment know about the IP
1877        addresses in play.  Save these into the repo.  Add attributes to the
1878        autorizer allowing access controllers to download them and return a set
1879        of attributes that inform the segments where to find this stuff.  May
1880        raise service_errors in if there are problems.
1881        """
1882        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1883        gw_secretkey_base = "fed.%s" % self.ssh_type
1884        keydir = os.path.join(tmpdir, 'keys')
1885        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1886        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1887
1888        try:
1889            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1890        except ValueError:
1891            raise service_error(service_error.server_config, 
1892                    "Bad key type (%s)" % self.ssh_type)
1893
1894        self.generate_seer_certs(keydir)
1895
1896        # Copy configuration files into the remote file store
1897        # The config urlpath
1898        configpath = "/%s/config" % expid
1899        # The config file system location
1900        configdir ="%s%s" % ( self.repodir, configpath)
1901        try:
1902            os.makedirs(configdir)
1903        except EnvironmentError, e:
1904            raise service_error(service_error.internal,
1905                    "Cannot create config directory: %s" % e)
1906        try:
1907            f = open("%s/hosts" % configdir, "w")
1908            print >> f, string.join(hosts, '\n')
1909            f.close()
1910        except EnvironmentError, e:
1911            raise service_error(service_error.internal, 
1912                    "Cannot write hosts file: %s" % e)
1913        try:
1914            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1915            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1916            copy_file(os.path.join(keydir, 'ca.pem'), 
1917                    os.path.join(configdir, 'ca.pem'))
1918            copy_file(os.path.join(keydir, 'node.pem'), 
1919                    os.path.join(configdir, 'node.pem'))
1920        except EnvironmentError, e:
1921            raise service_error(service_error.internal, 
1922                    "Cannot copy keyfiles: %s" % e)
1923
1924        # Allow the individual testbeds to access the configuration files,
1925        # again by setting an attribute for the relevant pathnames on each
1926        # allocation principal.  Yeah, that's a long list comprehension.
1927        self.append_experiment_authorization(expid, set([
1928            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1929                    for tb in tbparams.keys() \
1930                        for f in ("hosts", 'ca.pem', 'node.pem', 
1931                            gw_secretkey_base, gw_pubkey_base)]))
1932
1933        attrs = [ 
1934                {
1935                    'attribute': 'ssh_pubkey', 
1936                    'value': '%s/%s/config/%s' % \
1937                            (self.repo_url, expid, gw_pubkey_base)
1938                },
1939                {
1940                    'attribute': 'ssh_secretkey', 
1941                    'value': '%s/%s/config/%s' % \
1942                            (self.repo_url, expid, gw_secretkey_base)
1943                },
1944                {
1945                    'attribute': 'hosts', 
1946                    'value': '%s/%s/config/hosts' % \
1947                            (self.repo_url, expid)
1948                },
1949                {
1950                    'attribute': 'seer_ca_pem', 
1951                    'value': '%s/%s/config/%s' % \
1952                            (self.repo_url, expid, 'ca.pem')
1953                },
1954                {
1955                    'attribute': 'seer_node_pem', 
1956                    'value': '%s/%s/config/%s' % \
1957                            (self.repo_url, expid, 'node.pem')
1958                },
1959            ]
1960        return attrs
1961
1962
1963    def get_vtopo(self, req, fid):
1964        """
1965        Return the stored virtual topology for this experiment
1966        """
1967        rv = None
1968        state = None
1969
1970        req = req.get('VtopoRequestBody', None)
1971        if not req:
1972            raise service_error(service_error.req,
1973                    "Bad request format (no VtopoRequestBody)")
1974        exp = req.get('experiment', None)
1975        if exp:
1976            if exp.has_key('fedid'):
1977                key = exp['fedid']
1978                keytype = "fedid"
1979            elif exp.has_key('localname'):
1980                key = exp['localname']
1981                keytype = "localname"
1982            else:
1983                raise service_error(service_error.req, "Unknown lookup type")
1984        else:
1985            raise service_error(service_error.req, "No request?")
1986
1987        proof = self.check_experiment_access(fid, key)
1988
1989        self.state_lock.acquire()
1990        # XXX: this needs to be recalculated
1991        if key in self.state:
1992            if self.state[key].top is not None:
1993                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1994                rv = { 'experiment' : {keytype: key },
1995                        'vtopo': vtopo,
1996                        'proof': proof.to_dict(), 
1997                    }
1998            else:
1999                state = self.state[key].status
2000        self.state_lock.release()
2001
2002        if rv: return rv
2003        else: 
2004            if state:
2005                raise service_error(service_error.partial, 
2006                        "Not ready: %s" % state)
2007            else:
2008                raise service_error(service_error.req, "No such experiment")
2009
2010    def get_vis(self, req, fid):
2011        """
2012        Return the stored visualization for this experiment
2013        """
2014        rv = None
2015        state = None
2016
2017        req = req.get('VisRequestBody', None)
2018        if not req:
2019            raise service_error(service_error.req,
2020                    "Bad request format (no VisRequestBody)")
2021        exp = req.get('experiment', None)
2022        if exp:
2023            if exp.has_key('fedid'):
2024                key = exp['fedid']
2025                keytype = "fedid"
2026            elif exp.has_key('localname'):
2027                key = exp['localname']
2028                keytype = "localname"
2029            else:
2030                raise service_error(service_error.req, "Unknown lookup type")
2031        else:
2032            raise service_error(service_error.req, "No request?")
2033
2034        proof = self.check_experiment_access(fid, key)
2035
2036        self.state_lock.acquire()
2037        # Generate the visualization
2038        if key in self.state:
2039            if self.state[key].top is not None:
2040                try:
2041                    vis = self.genviz(
2042                            topdl.topology_to_vtopo(self.state[key].top))
2043                except service_error, e:
2044                    self.state_lock.release()
2045                    raise e
2046                rv =  { 'experiment' : {keytype: key },
2047                        'vis': vis,
2048                        'proof': proof.to_dict(), 
2049                        }
2050            else:
2051                state = self.state[key].status
2052        self.state_lock.release()
2053
2054        if rv: return rv
2055        else:
2056            if state:
2057                raise service_error(service_error.partial, 
2058                        "Not ready: %s" % state)
2059            else:
2060                raise service_error(service_error.req, "No such experiment")
2061
2062   
2063    def save_federant_information(self, allocated, tbparams, eid, top):
2064        """
2065        Store the various data that have changed in the experiment state
2066        between when it was started and the beginning of resource allocation.
2067        This is basically the information about each local allocation.  This
2068        fills in the values of the placeholder allocation in the state.  It
2069        also collects the access proofs and returns them as dicts for a
2070        response message.
2071        """
2072        self.state_lock.acquire()
2073        exp = self.state[eid]
2074        exp.top = top.clone()
2075        # save federant information
2076        for k in allocated.keys():
2077            exp.add_allocation(tbparams[k])
2078            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2079                type="testbed", localname=[k], 
2080                service=[ s.to_topdl() for s in tbparams[k].services]))
2081
2082        # Access proofs for the response message
2083        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2084                    for p in tbparams[k].proof]
2085        exp.updated()
2086        if self.state_filename: 
2087            self.write_state()
2088        self.state_lock.release()
2089        return proofs
2090
2091    def clear_placeholder(self, eid, expid, tmpdir):
2092        """
2093        Clear the placeholder and remove any allocated temporary dir.
2094        """
2095
2096        self.state_lock.acquire()
2097        del self.state[eid]
2098        del self.state[expid]
2099        if self.state_filename: self.write_state()
2100        self.state_lock.release()
2101        if tmpdir and self.cleanup:
2102            self.remove_dirs(tmpdir)
2103
2104    # end of create_experiment sub-functions
2105
2106    def create_experiment(self, req, fid):
2107        """
2108        The external interface to experiment creation called from the
2109        dispatcher.
2110
2111        Creates a working directory, splits the incoming description using the
2112        splitter script and parses out the various subsections using the
2113        classes above.  Once each sub-experiment is created, use pooled threads
2114        to instantiate them and start it all up.
2115        """
2116
2117        req = req.get('CreateRequestBody', None)
2118        if req:
2119            key = self.get_experiment_key(req)
2120        else:
2121            raise service_error(service_error.req,
2122                    "Bad request format (no CreateRequestBody)")
2123
2124        # Import information from the requester
2125        if self.auth.import_credentials(data_list=req.get('credential', [])):
2126            self.auth.save()
2127
2128        # Make sure that the caller can talk to us
2129        proof = self.check_experiment_access(fid, key)
2130
2131        # Install the testbed map entries supplied with the request into a copy
2132        # of the testbed map.
2133        tbmap = dict(self.tbmap)
2134        for m in req.get('testbedmap', []):
2135            if 'testbed' in m and 'uri' in m:
2136                tbmap[m['testbed']] = m['uri']
2137
2138        # a place to work
2139        try:
2140            tmpdir = tempfile.mkdtemp(prefix="split-")
2141            os.mkdir(tmpdir+"/keys")
2142        except EnvironmentError:
2143            raise service_error(service_error.internal, "Cannot create tmp dir")
2144
2145        tbparams = { }
2146
2147        eid, expid, expcert_file = \
2148                self.get_experiment_ids_and_start(key, tmpdir)
2149
2150        # This catches exceptions to clear the placeholder if necessary
2151        try: 
2152            if not (eid and expid):
2153                raise service_error(service_error.internal, 
2154                        "Cannot find local experiment info!?")
2155
2156            top = self.get_topology(req, tmpdir)
2157            self.confirm_software(top)
2158            # Assign the IPs
2159            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2160            # Find the testbeds to look up
2161            tb_hosts = { }
2162            testbeds = [ ]
2163            for e in top.elements:
2164                if isinstance(e, topdl.Computer):
2165                    tb = e.get_attribute('testbed') or 'default'
2166                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2167                    else: 
2168                        tb_hosts[tb] = [ e.name ]
2169                        testbeds.append(tb)
2170
2171            masters, pmasters = self.get_testbed_services(req, testbeds)
2172            allocated = { }         # Testbeds we can access
2173            topo ={ }               # Sub topologies
2174            connInfo = { }          # Connection information
2175
2176            self.split_topology(top, topo, testbeds)
2177
2178            self.get_access_to_testbeds(testbeds, fid, allocated, 
2179                    tbparams, masters, tbmap, expid, expcert_file)
2180
2181            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2182
2183            part = experiment_partition(self.auth, self.store_url, tbmap,
2184                    self.muxmax, self.direct_transit)
2185            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2186                    connInfo, expid)
2187
2188            auth_attrs = set()
2189            # Now get access to the dynamic testbeds (those added above)
2190            for tb in [ t for t in topo if t not in allocated]:
2191                self.get_access(tb, tbparams, fid, masters, tbmap, 
2192                        expid, expcert_file)
2193                allocated[tb] = 1
2194                store_keys = topo[tb].get_attribute('store_keys')
2195                # Give the testbed access to keys it exports or imports
2196                if store_keys:
2197                    auth_attrs.update(set([
2198                        (tbparams[tb].allocID, sk) \
2199                                for sk in store_keys.split(" ")]))
2200
2201            if auth_attrs:
2202                self.append_experiment_authorization(expid, auth_attrs)
2203
2204            # transit and disconnected testbeds may not have a connInfo entry.
2205            # Fill in the blanks.
2206            for t in allocated.keys():
2207                if not connInfo.has_key(t):
2208                    connInfo[t] = { }
2209
2210            self.wrangle_software(expid, top, topo, tbparams)
2211
2212            proofs = self.save_federant_information(allocated, tbparams, 
2213                    eid, top)
2214        except service_error, e:
2215            # If something goes wrong in the parse (usually an access error)
2216            # clear the placeholder state.  From here on out the code delays
2217            # exceptions.  Failing at this point returns a fault to the remote
2218            # caller.
2219            self.clear_placeholder(eid, expid, tmpdir)
2220            raise e
2221
2222        # Start the background swapper and return the starting state.  From
2223        # here on out, the state will stick around a while.
2224
2225        # Create a logger that logs to the experiment's state object as well as
2226        # to the main log file.
2227        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2228        alloc_collector = self.list_log(self.state[eid].log)
2229        h = logging.StreamHandler(alloc_collector)
2230        # XXX: there should be a global one of these rather than repeating the
2231        # code.
2232        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2233                    '%d %b %y %H:%M:%S'))
2234        alloc_log.addHandler(h)
2235
2236        # Start a thread to do the resource allocation
2237        t  = Thread(target=self.allocate_resources,
2238                args=(allocated, masters, eid, expid, tbparams, 
2239                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2240                    connInfo, tbmap, expcert_file),
2241                name=eid)
2242        t.start()
2243
2244        rv = {
2245                'experimentID': [
2246                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2247                ],
2248                'experimentStatus': 'starting',
2249                'proof': [ proof.to_dict() ] + proofs,
2250            }
2251
2252        return rv
2253   
2254    def get_experiment_fedid(self, key):
2255        """
2256        find the fedid associated with the localname key in the state database.
2257        """
2258
2259        rv = None
2260        self.state_lock.acquire()
2261        if key in self.state:
2262            rv = self.state[key].fedid
2263        self.state_lock.release()
2264        return rv
2265
2266    def check_experiment_access(self, fid, key):
2267        """
2268        Confirm that the fid has access to the experiment.  Though a request
2269        may be made in terms of a local name, the access attribute is always
2270        the experiment's fedid.
2271        """
2272        if not isinstance(key, fedid):
2273            key = self.get_experiment_fedid(key)
2274
2275        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2276
2277        if access_ok:
2278            return proof
2279        else:
2280            raise service_error(service_error.access, "Access Denied",
2281                proof)
2282
2283
2284    def get_handler(self, path, fid):
2285        """
2286        Perhaps surprisingly named, this function handles HTTP GET requests to
2287        this server (SOAP requests are POSTs).
2288        """
2289        self.log.info("Get handler %s %s" % (path, fid))
2290        # XXX: log proofs?
2291        if self.auth.check_attribute(fid, path):
2292            return ("%s/%s" % (self.repodir, path), "application/binary")
2293        else:
2294            return (None, None)
2295
2296    def update_info(self, key, force=False):
2297        top = None
2298        self.state_lock.acquire()
2299        if key in self.state:
2300            if force or self.state[key].older_than(self.info_cache_limit):
2301                top = self.state[key].top
2302                if top is not None: top = top.clone()
2303                d1, info_params, cert, d2 = \
2304                        self.get_segment_info(self.state[key], need_lock=False)
2305        self.state_lock.release()
2306
2307        if top is None: return
2308
2309        try:
2310            tmpdir = tempfile.mkdtemp(prefix="info-")
2311        except EnvironmentError:
2312            raise service_error(service_error.internal, 
2313                    "Cannot create tmp dir")
2314        cert_file = self.make_temp_certfile(cert, tmpdir)
2315
2316        data = []
2317        try:
2318            for k, (uri, aid) in info_params.items():
2319                info=self.info_segment(log=self.log, testbed=uri,
2320                            cert_file=cert_file, cert_pwd=None,
2321                            trusted_certs=self.trusted_certs,
2322                            caller=self.call_InfoSegment)
2323                info(uri, aid)
2324                data.append(info)
2325        # Clean up the tmpdir no matter what
2326        finally:
2327            if tmpdir: self.remove_dirs(tmpdir)
2328
2329        self.annotate_topology(top, data)
2330        self.state_lock.acquire()
2331        if key in self.state:
2332            self.state[key].top = top
2333            self.state[key].updated()
2334            if self.state_filename: self.write_state()
2335        self.state_lock.release()
2336
2337   
2338    def get_info(self, req, fid):
2339        """
2340        Return all the stored info about this experiment
2341        """
2342        rv = None
2343
2344        req = req.get('InfoRequestBody', None)
2345        if not req:
2346            raise service_error(service_error.req,
2347                    "Bad request format (no InfoRequestBody)")
2348        exp = req.get('experiment', None)
2349        legacy = req.get('legacy', False)
2350        fresh = req.get('fresh', False)
2351        if exp:
2352            if exp.has_key('fedid'):
2353                key = exp['fedid']
2354                keytype = "fedid"
2355            elif exp.has_key('localname'):
2356                key = exp['localname']
2357                keytype = "localname"
2358            else:
2359                raise service_error(service_error.req, "Unknown lookup type")
2360        else:
2361            raise service_error(service_error.req, "No request?")
2362
2363        proof = self.check_experiment_access(fid, key)
2364
2365        self.update_info(key, fresh)
2366
2367        self.state_lock.acquire()
2368        if self.state.has_key(key):
2369            rv = self.state[key].get_info()
2370            # Copy the topo if we need legacy annotations
2371            if legacy:
2372                top = self.state[key].top
2373                if top is not None: top = top.clone()
2374        self.state_lock.release()
2375
2376        # If the legacy visualization and topology representations are
2377        # requested, calculate them and add them to the return.
2378        if legacy and rv is not None:
2379            if top is not None:
2380                vtopo = topdl.topology_to_vtopo(top)
2381                if vtopo is not None:
2382                    rv['vtopo'] = vtopo
2383                    try:
2384                        vis = self.genviz(vtopo)
2385                    except service_error, e:
2386                        self.log.debug('Problem generating visualization: %s' \
2387                                % e)
2388                        vis = None
2389                    if vis is not None:
2390                        rv['vis'] = vis
2391        if rv:
2392            rv['proof'] = proof.to_dict()
2393            return rv
2394        else:
2395            raise service_error(service_error.req, "No such experiment")
2396
2397    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2398            results):
2399        """
2400        Call OperateSegment on multiple testbeds and gather the results.
2401        op_params contains the parameters needed to contact that testbed, cert
2402        is a certificate containing the fedid to use, op is the operation,
2403        testbeds is a dict mapping testbed name to targets in that testbed,
2404        params are the parameters to include a,d results is a growing list of
2405        the results of the calls.
2406        """
2407        try:
2408            tmpdir = tempfile.mkdtemp(prefix="info-")
2409        except EnvironmentError:
2410            raise service_error(service_error.internal, 
2411                    "Cannot create tmp dir")
2412        cert_file = self.make_temp_certfile(cert, tmpdir)
2413
2414        try:
2415            for tb, targets in testbeds.items():
2416                if tb in op_params:
2417                    uri, aid = op_params[tb]
2418                    operate=self.operation_segment(log=self.log, testbed=uri,
2419                                cert_file=cert_file, cert_pwd=None,
2420                                trusted_certs=self.trusted_certs,
2421                                caller=self.call_OperationSegment)
2422                    if operate(uri, aid, op, targets, params):
2423                        if operate.status is not None:
2424                            results.extend(operate.status)
2425                            continue
2426                # Something went wrong in a weird way.  Add statuses
2427                # that reflect that to results
2428                for t in targets:
2429                    results.append(operation_status(t, 
2430                        operation_status.federant,
2431                        'Unexpected error on %s' % tb))
2432        # Clean up the tmpdir no matter what
2433        finally:
2434            if tmpdir: self.remove_dirs(tmpdir)
2435
2436    def do_operation(self, req, fid):
2437        """
2438        Find the testbeds holding each target and ask them to carry out the
2439        operation.  Return the statuses.
2440        """
2441        # Map an element to the testbed containing it
2442        def element_to_tb(e):
2443            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2444            elif isinstance(e, topdl.Testbed): return e.name
2445            else: return None
2446        # If d is an operation_status object, make it a dict
2447        def make_dict(d):
2448            if isinstance(d, dict): return d
2449            elif isinstance(d, operation_status): return d.to_dict()
2450            else: return { }
2451
2452        def element_name(e):
2453            if isinstance(e, topdl.Computer): return e.name
2454            elif isinstance(e, topdl.Testbed): 
2455                if e.localname: return e.localname[0]
2456                else: return None
2457            else: return None
2458
2459        req = req.get('OperationRequestBody', None)
2460        if not req:
2461            raise service_error(service_error.req,
2462                    "Bad request format (no OperationRequestBody)")
2463        exp = req.get('experiment', None)
2464        op = req.get('operation', None)
2465        targets = set(req.get('target', []))
2466        params = req.get('parameter', None)
2467
2468        if exp:
2469            if 'fedid' in exp:
2470                key = exp['fedid']
2471                keytype = "fedid"
2472            elif 'localname' in exp:
2473                key = exp['localname']
2474                keytype = "localname"
2475            else:
2476                raise service_error(service_error.req, "Unknown lookup type")
2477        else:
2478            raise service_error(service_error.req, "No request?")
2479
2480        if op is None or not targets:
2481            raise service_error(service_error.req, "No request?")
2482
2483        proof = self.check_experiment_access(fid, key)
2484        self.state_lock.acquire()
2485        if key in self.state:
2486            d1, op_params, cert, d2 = \
2487                    self.get_segment_info(self.state[key], need_lock=False,
2488                            key='tb')
2489            top = self.state[key].top
2490            if top is not None:
2491                top = top.clone()
2492        self.state_lock.release()
2493
2494        if top is None:
2495            raise service_error(service_error.partial, "No topology yet", 
2496                    proof=proof)
2497
2498        testbeds = { }
2499        results = []
2500        for e in top.elements:
2501            ename = element_name(e)
2502            if ename in targets:
2503                tb = element_to_tb(e)
2504                targets.remove(ename)
2505                if tb is not None:
2506                    if tb in testbeds: testbeds[tb].append(ename)
2507                    else: testbeds[tb] = [ ename ]
2508                else:
2509                    results.append(operation_status(e.name, 
2510                        code=operation_status.no_target, 
2511                        description='Cannot map target to testbed'))
2512
2513        for t in targets:
2514            results.append(operation_status(t, operation_status.no_target))
2515
2516        self.operate_on_segments(op_params, cert, op, testbeds, params,
2517                results)
2518
2519        return { 
2520                'experiment': exp, 
2521                'status': [make_dict(r) for r in results],
2522                'proof': proof.to_dict()
2523                }
2524
2525
2526    def get_multi_info(self, req, fid):
2527        """
2528        Return all the stored info that this fedid can access
2529        """
2530        rv = { 'info': [ ], 'proof': [ ] }
2531
2532        self.state_lock.acquire()
2533        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2534            try:
2535                proof = self.check_experiment_access(fid, key)
2536            except service_error, e:
2537                if e.code == service_error.access:
2538                    continue
2539                else:
2540                    self.state_lock.release()
2541                    raise e
2542
2543            if self.state.has_key(key):
2544                e = self.state[key].get_info()
2545                e['proof'] = proof.to_dict()
2546                rv['info'].append(e)
2547                rv['proof'].append(proof.to_dict())
2548        self.state_lock.release()
2549        return rv
2550
2551    def check_termination_status(self, fed_exp, force):
2552        """
2553        Confirm that the experiment is sin a valid state to stop (or force it)
2554        return the state - invalid states for deletion and force settings cause
2555        exceptions.
2556        """
2557        self.state_lock.acquire()
2558        status = fed_exp.status
2559
2560        if status:
2561            if status in ('starting', 'terminating'):
2562                if not force:
2563                    self.state_lock.release()
2564                    raise service_error(service_error.partial, 
2565                            'Experiment still being created or destroyed')
2566                else:
2567                    self.log.warning('Experiment in %s state ' % status + \
2568                            'being terminated by force.')
2569            self.state_lock.release()
2570            return status
2571        else:
2572            # No status??? trouble
2573            self.state_lock.release()
2574            raise service_error(service_error.internal,
2575                    "Experiment has no status!?")
2576
2577    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2578        ids = []
2579        term_params = { }
2580        if need_lock: self.state_lock.acquire()
2581        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2582        expcert = fed_exp.identity
2583        repo = "%s" % fed_exp.fedid
2584
2585        # Collect the allocation/segment ids into a dict keyed by the fedid
2586        # of the allocation that contains a tuple of uri, aid
2587        for i, fed in enumerate(fed_exp.get_all_allocations()):
2588            uri = fed.uri
2589            aid = fed.allocID
2590            if key == 'aid': term_params[aid] = (uri, aid)
2591            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2592
2593        if need_lock: self.state_lock.release()
2594        return ids, term_params, expcert, repo
2595
2596
2597    def get_termination_info(self, fed_exp):
2598        self.state_lock.acquire()
2599        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2600        # Change the experiment state
2601        fed_exp.status = 'terminating'
2602        fed_exp.updated()
2603        if self.state_filename: self.write_state()
2604        self.state_lock.release()
2605
2606        return ids, term_params, expcert, repo
2607
2608
2609    def deallocate_resources(self, term_params, expcert, status, force, 
2610            dealloc_log):
2611        tmpdir = None
2612        # This try block makes sure the tempdir is cleared
2613        try:
2614            # If no expcert, try the deallocation as the experiment
2615            # controller instance.
2616            if expcert and self.auth_type != 'legacy': 
2617                try:
2618                    tmpdir = tempfile.mkdtemp(prefix="term-")
2619                except EnvironmentError:
2620                    raise service_error(service_error.internal, 
2621                            "Cannot create tmp dir")
2622                cert_file = self.make_temp_certfile(expcert, tmpdir)
2623                pw = None
2624            else: 
2625                cert_file = self.cert_file
2626                pw = self.cert_pwd
2627
2628            # Stop everyone.  NB, wait_for_all waits until a thread starts
2629            # and then completes, so we can't wait if nothing starts.  So,
2630            # no tbparams, no start.
2631            if len(term_params) > 0:
2632                tp = thread_pool(self.nthreads)
2633                for k, (uri, aid) in term_params.items():
2634                    # Create and start a thread to stop the segment
2635                    tp.wait_for_slot()
2636                    t  = pooled_thread(\
2637                            target=self.terminate_segment(log=dealloc_log,
2638                                testbed=uri,
2639                                cert_file=cert_file, 
2640                                cert_pwd=pw,
2641                                trusted_certs=self.trusted_certs,
2642                                caller=self.call_TerminateSegment),
2643                            args=(uri, aid), name=k,
2644                            pdata=tp, trace_file=self.trace_file)
2645                    t.start()
2646                # Wait for completions
2647                tp.wait_for_all_done()
2648
2649            # release the allocations (failed experiments have done this
2650            # already, and starting experiments may be in odd states, so we
2651            # ignore errors releasing those allocations
2652            try: 
2653                for k, (uri, aid)  in term_params.items():
2654                    self.release_access(None, aid, uri=uri,
2655                            cert_file=cert_file, cert_pwd=pw)
2656            except service_error, e:
2657                if status != 'failed' and not force:
2658                    raise e
2659
2660        # Clean up the tmpdir no matter what
2661        finally:
2662            if tmpdir: self.remove_dirs(tmpdir)
2663
2664    def terminate_experiment(self, req, fid):
2665        """
2666        Swap this experiment out on the federants and delete the shared
2667        information
2668        """
2669        tbparams = { }
2670        req = req.get('TerminateRequestBody', None)
2671        if not req:
2672            raise service_error(service_error.req,
2673                    "Bad request format (no TerminateRequestBody)")
2674
2675        key = self.get_experiment_key(req, 'experiment')
2676        proof = self.check_experiment_access(fid, key)
2677        exp = req.get('experiment', False)
2678        force = req.get('force', False)
2679
2680        dealloc_list = [ ]
2681
2682
2683        # Create a logger that logs to the dealloc_list as well as to the main
2684        # log file.
2685        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2686        dealloc_log.info("Terminating %s " %key)
2687        h = logging.StreamHandler(self.list_log(dealloc_list))
2688        # XXX: there should be a global one of these rather than repeating the
2689        # code.
2690        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2691                    '%d %b %y %H:%M:%S'))
2692        dealloc_log.addHandler(h)
2693
2694        self.state_lock.acquire()
2695        fed_exp = self.state.get(key, None)
2696        self.state_lock.release()
2697        repo = None
2698
2699        if fed_exp:
2700            status = self.check_termination_status(fed_exp, force)
2701            # get_termination_info updates the experiment state
2702            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2703            self.deallocate_resources(term_params, expcert, status, force, 
2704                    dealloc_log)
2705
2706            # Remove the terminated experiment
2707            self.state_lock.acquire()
2708            for id in ids:
2709                self.clear_experiment_authorization(id, need_state_lock=False)
2710                if id in self.state: del self.state[id]
2711
2712            if self.state_filename: self.write_state()
2713            self.state_lock.release()
2714
2715            # Delete any synch points associated with this experiment.  All
2716            # synch points begin with the fedid of the experiment.
2717            fedid_keys = set(["fedid:%s" % f for f in ids \
2718                    if isinstance(f, fedid)])
2719            for k in self.synch_store.all_keys():
2720                try:
2721                    if len(k) > 45 and k[0:46] in fedid_keys:
2722                        self.synch_store.del_value(k)
2723                except synch_store.BadDeletionError:
2724                    pass
2725            self.write_store()
2726
2727            # Remove software and other cached stuff from the filesystem.
2728            if repo:
2729                self.remove_dirs("%s/%s" % (self.repodir, repo))
2730       
2731            return { 
2732                    'experiment': exp , 
2733                    'deallocationLog': string.join(dealloc_list, ''),
2734                    'proof': [proof.to_dict()],
2735                    }
2736        else:
2737            raise service_error(service_error.req, "No saved state")
2738
2739
2740    def GetValue(self, req, fid):
2741        """
2742        Get a value from the synchronized store
2743        """
2744        req = req.get('GetValueRequestBody', None)
2745        if not req:
2746            raise service_error(service_error.req,
2747                    "Bad request format (no GetValueRequestBody)")
2748       
2749        name = req.get('name', None)
2750        wait = req.get('wait', False)
2751        rv = { 'name': name }
2752
2753        if not name:
2754            raise service_error(service_error.req, "No name?")
2755
2756        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2757
2758        if access_ok:
2759            self.log.debug("[GetValue] asking for %s " % name)
2760            try:
2761                v = self.synch_store.get_value(name, wait)
2762            except synch_store.RevokedKeyError:
2763                # No more synch on this key
2764                raise service_error(service_error.federant, 
2765                        "Synch key %s revoked" % name)
2766            if v is not None:
2767                rv['value'] = v
2768            rv['proof'] = proof.to_dict()
2769            self.log.debug("[GetValue] got %s from %s" % (v, name))
2770            return rv
2771        else:
2772            raise service_error(service_error.access, "Access Denied",
2773                    proof=proof)
2774       
2775
2776    def SetValue(self, req, fid):
2777        """
2778        Set a value in the synchronized store
2779        """
2780        req = req.get('SetValueRequestBody', None)
2781        if not req:
2782            raise service_error(service_error.req,
2783                    "Bad request format (no SetValueRequestBody)")
2784       
2785        name = req.get('name', None)
2786        v = req.get('value', '')
2787
2788        if not name:
2789            raise service_error(service_error.req, "No name?")
2790
2791        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2792
2793        if access_ok:
2794            try:
2795                self.synch_store.set_value(name, v)
2796                self.write_store()
2797                self.log.debug("[SetValue] set %s to %s" % (name, v))
2798            except synch_store.CollisionError:
2799                # Translate into a service_error
2800                raise service_error(service_error.req,
2801                        "Value already set: %s" %name)
2802            except synch_store.RevokedKeyError:
2803                # No more synch on this key
2804                raise service_error(service_error.federant, 
2805                        "Synch key %s revoked" % name)
2806                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2807        else:
2808            raise service_error(service_error.access, "Access Denied",
2809                    proof=proof)
Note: See TracBrowser for help on using the repository browser.