source: fedd/federation/experiment_control.py @ 1ae1aa2

compt_changesinfo-ops
Last change on this file since 1ae1aa2 was 1ae1aa2, checked in by Ted Faber <faber@…>, 12 years ago

Reload interface works

  • Property mode set to 100644
File size: 91.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39from operation_status import operation_status
40
41import topdl
42import list_log
43from ip_allocator import ip_allocator
44from ip_addr import ip_addr
45
46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
53class experiment_control_local(experiment_control_legacy):
54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
60
61    class ssh_cmd_timeout(RuntimeError): pass
62   
63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
65    call_StartSegment = service_caller('StartSegment')
66    call_TerminateSegment = service_caller('TerminateSegment')
67    call_InfoSegment = service_caller('InfoSegment')
68    call_OperationSegment = service_caller('OperationSegment')
69    call_Ns2Topdl = service_caller('Ns2Topdl')
70
71    def __init__(self, config=None, auth=None):
72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
90        self.list_log = list_log.list_log
91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
102        self.repodir = config.get("experiment_control", "repodir")
103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
105
106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
110        self.nthreads = 10
111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131        accessdb_file = config.get("experiment_control", "accessdb")
132
133        dt = config.get("experiment_control", "direct_transit")
134        self.auth_type = config.get('experiment_control', 'auth_type') \
135                or 'legacy'
136        self.auth_dir = config.get('experiment_control', 'auth_dir')
137        # XXX: document this!
138        self.info_cache_limit = \
139                config.getint('experiment_control', 'info_cache', 600)
140        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
141        else: self.direct_transit = [ ]
142        # NB for internal master/slave ops, not experiment setup
143        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
144
145        self.overrides = set([])
146        ovr = config.get('experiment_control', 'overrides')
147        if ovr:
148            for o in ovr.split(","):
149                o = o.strip()
150                if o.startswith('fedid:'): o = o[len('fedid:'):]
151                self.overrides.add(fedid(hexstr=o))
152
153        self.state = { }
154        self.state_lock = Lock()
155        self.tclsh = "/usr/local/bin/otclsh"
156        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
157                config.get("experiment_control", "tcl_splitter",
158                        "/usr/testbed/lib/ns2ir/parse.tcl")
159        mapdb_file = config.get("experiment_control", "mapdb")
160        self.trace_file = sys.stderr
161
162        self.def_expstart = \
163                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
164                "/tmp/federate";
165        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
166                "FEDDIR/hosts";
167        self.def_gwstart = \
168                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
169                "/tmp/bridge.log";
170        self.def_mgwstart = \
171                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
172                "/tmp/bridge.log";
173        self.def_gwimage = "FBSD61-TUNNEL2";
174        self.def_gwtype = "pc";
175        self.local_access = { }
176
177        if self.auth_type == 'legacy':
178            if auth:
179                self.auth = auth
180            else:
181                self.log.error( "[access]: No authorizer initialized, " +\
182                        "creating local one.")
183                auth = authorizer()
184            self.get_access = self.legacy_get_access
185        elif self.auth_type == 'abac':
186            self.auth = abac_authorizer(load=self.auth_dir)
187        else:
188            raise service_error(service_error.internal, 
189                    "Unknown auth_type: %s" % self.auth_type)
190
191        if mapdb_file:
192            self.read_mapdb(mapdb_file)
193        else:
194            self.log.warn("[experiment_control] No testbed map, using defaults")
195            self.tbmap = { 
196                    'deter':'https://users.isi.deterlab.net:23235',
197                    'emulab':'https://users.isi.deterlab.net:23236',
198                    'ucb':'https://users.isi.deterlab.net:23237',
199                    }
200
201        if accessdb_file:
202                self.read_accessdb(accessdb_file)
203        else:
204            raise service_error(service_error.internal,
205                    "No accessdb specified in config")
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
219        # Dispatch tables
220        self.soap_services = {\
221                'New': soap_handler('New', self.new_experiment),
222                'Create': soap_handler('Create', self.create_experiment),
223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
227                'Operation': soap_handler('Operation', self.do_operation),
228                'Terminate': soap_handler('Terminate', 
229                    self.terminate_experiment),
230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
232        }
233
234        self.xmlrpc_services = {\
235                'New': xmlrpc_handler('New', self.new_experiment),
236                'Create': xmlrpc_handler('Create', self.create_experiment),
237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
241                'Terminate': xmlrpc_handler('Terminate',
242                    self.terminate_experiment),
243                'Operation': xmlrpc_handler('Operation', self.do_operation),
244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
246        }
247
248    # Call while holding self.state_lock
249    def write_state(self):
250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
262        except EnvironmentError, e:
263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
269
270    @staticmethod
271    def get_alloc_ids(exp):
272        """
273        Used by read_store and read state.  This used to be worse.
274        """
275
276        return [ a.allocID for a in exp.get_all_allocations() ]
277
278
279    # Call while holding self.state_lock
280    def read_state(self):
281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
286       
287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
292        except EnvironmentError, e:
293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
299        for s in self.state.values():
300            try:
301
302                eid = s.fedid
303                if eid : 
304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
307                        #self.auth.set_attribute(s['owner'], eid)
308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
317                else:
318                    raise KeyError("No experiment id")
319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
322
323
324    def read_accessdb(self, accessdb_file):
325        """
326        Read the mapping from fedids that can create experiments to their name
327        in the 3-level access namespace.  All will be asserted from this
328        testbed and can include the local username and porject that will be
329        asserted on their behalf by this fedd.  Each fedid is also added to the
330        authorization system with the "create" attribute.
331        """
332        self.accessdb = {}
333        # These are the regexps for parsing the db
334        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
335        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
336                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
337        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
338                "\s*->\s*(" + name_expr + ")\s*$")
339        lineno = 0
340
341        # Parse the mappings and store in self.authdb, a dict of
342        # fedid -> (proj, user)
343        try:
344            f = open(accessdb_file, "r")
345            for line in f:
346                lineno += 1
347                line = line.strip()
348                if len(line) == 0 or line.startswith('#'):
349                    continue
350                m = project_line.match(line)
351                if m:
352                    fid = fedid(hexstr=m.group(1))
353                    project, user = m.group(2,3)
354                    if not self.accessdb.has_key(fid):
355                        self.accessdb[fid] = []
356                    self.accessdb[fid].append((project, user))
357                    continue
358
359                m = user_line.match(line)
360                if m:
361                    fid = fedid(hexstr=m.group(1))
362                    project = None
363                    user = m.group(2)
364                    if not self.accessdb.has_key(fid):
365                        self.accessdb[fid] = []
366                    self.accessdb[fid].append((project, user))
367                    continue
368                self.log.warn("[experiment_control] Error parsing access " +\
369                        "db %s at line %d" %  (accessdb_file, lineno))
370        except EnvironmentError:
371            raise service_error(service_error.internal,
372                    ("Error opening/reading %s as experiment " +\
373                            "control accessdb") %  accessdb_file)
374        f.close()
375
376        # Initialize the authorization attributes
377        # XXX: legacy
378        if self.auth_type == 'legacy':
379            for fid in self.accessdb.keys():
380                self.auth.set_attribute(fid, 'create')
381                self.auth.set_attribute(fid, 'new')
382
383    def read_mapdb(self, file):
384        """
385        Read a simple colon separated list of mappings for the
386        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
387        """
388
389        self.tbmap = { }
390        lineno =0
391        try:
392            f = open(file, "r")
393            for line in f:
394                lineno += 1
395                line = line.strip()
396                if line.startswith('#') or len(line) == 0:
397                    continue
398                try:
399                    label, url = line.split(':', 1)
400                    self.tbmap[label] = url
401                except ValueError, e:
402                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
403                            "map db: %s %s" % (lineno, line, e))
404        except EnvironmentError, e:
405            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
406                    "open %s: %s" % (file, e))
407        else:
408            f.close()
409
410    def read_store(self):
411        try:
412            self.synch_store = synch_store()
413            self.synch_store.load(self.store_filename)
414            self.log.debug("[read_store]: Read store from %s" % \
415                    self.store_filename)
416        except EnvironmentError, e:
417            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
418                    % (self.state_filename, e))
419            self.synch_store = synch_store()
420
421        # Set the initial permissions on data in the store.  XXX: This ad hoc
422        # authorization attribute initialization is getting out of hand.
423        # XXX: legacy
424        if self.auth_type == 'legacy':
425            for k in self.synch_store.all_keys():
426                try:
427                    if k.startswith('fedid:'):
428                        fid = fedid(hexstr=k[6:46])
429                        if self.state.has_key(fid):
430                            for a in self.get_alloc_ids(self.state[fid]):
431                                self.auth.set_attribute(a, k)
432                except ValueError, e:
433                    self.log.warn("Cannot deduce permissions for %s" % k)
434
435
436    def write_store(self):
437        """
438        Write a new copy of synch_store after writing current state
439        to a backup.  We use the internal synch_store pickle method to avoid
440        incinsistent data.
441
442        State format is a simple pickling of the store.
443        """
444        if os.access(self.store_filename, os.W_OK):
445            copy_file(self.store_filename, \
446                    "%s.bak" % self.store_filename)
447        try:
448            self.synch_store.save(self.store_filename)
449        except EnvironmentError, e:
450            self.log.error("Can't write file %s: %s" % \
451                    (self.store_filename, e))
452        except TypeError, e:
453            self.log.error("Pickling problem (TypeError): %s" % e)
454
455
456    def remove_dirs(self, dir):
457        """
458        Remove the directory tree and all files rooted at dir.  Log any errors,
459        but continue.
460        """
461        self.log.debug("[removedirs]: removing %s" % dir)
462        try:
463            for path, dirs, files in os.walk(dir, topdown=False):
464                for f in files:
465                    os.remove(os.path.join(path, f))
466                for d in dirs:
467                    os.rmdir(os.path.join(path, d))
468            os.rmdir(dir)
469        except EnvironmentError, e:
470            self.log.error("Error deleting directory tree in %s" % e);
471
472    @staticmethod
473    def make_temp_certfile(expcert, tmpdir):
474        """
475        make a protected copy of the access certificate so the experiment
476        controller can act as the experiment principal.  mkstemp is the most
477        secure way to do that. The directory should be created by
478        mkdtemp.  Return the filename.
479        """
480        if expcert and tmpdir:
481            try:
482                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
483                f = os.fdopen(certf, 'w')
484                print >> f, expcert
485                f.close()
486            except EnvironmentError, e:
487                raise service_error(service_error.internal, 
488                        "Cannot create temp cert file?")
489            return certfn
490        else:
491            return None
492
493       
494    def generate_ssh_keys(self, dest, type="rsa" ):
495        """
496        Generate a set of keys for the gateways to use to talk.
497
498        Keys are of type type and are stored in the required dest file.
499        """
500        valid_types = ("rsa", "dsa")
501        t = type.lower();
502        if t not in valid_types: raise ValueError
503        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
504
505        try:
506            trace = open("/dev/null", "w")
507        except EnvironmentError:
508            raise service_error(service_error.internal,
509                    "Cannot open /dev/null??");
510
511        # May raise CalledProcessError
512        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
513        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
514        if rv != 0:
515            raise service_error(service_error.internal, 
516                    "Cannot generate nonce ssh keys.  %s return code %d" \
517                            % (self.ssh_keygen, rv))
518
519    def generate_seer_certs(self, destdir):
520        '''
521        Create a SEER ca cert and a node cert in destdir/ca.pem and
522        destdir/node.pem respectively.  These will be distributed throughout
523        the federated experiment.  This routine reports errors via
524        service_errors.
525        '''
526        openssl = '/usr/bin/openssl'
527        # All the filenames and parameters we need for openssl calls below
528        ca_key =os.path.join(destdir, 'ca.key') 
529        ca_pem = os.path.join(destdir, 'ca.pem')
530        node_key =os.path.join(destdir, 'node.key') 
531        node_pem = os.path.join(destdir, 'node.pem')
532        node_req = os.path.join(destdir, 'node.req')
533        node_signed = os.path.join(destdir, 'node.signed')
534        days = '%s' % (365 * 10)
535        serial = '%s' % random.randint(0, 1<<16)
536
537        try:
538            # Sequence of calls to create a CA key, create a ca cert, create a
539            # node key, node signing request, and finally a signed node
540            # certificate.
541            sequence = (
542                    (openssl, 'genrsa', '-out', ca_key, '1024'),
543                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
544                        ca_pem, '-days', days, '-subj', 
545                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
546                    (openssl, 'genrsa', '-out', node_key, '1024'),
547                    (openssl, 'req', '-new', '-key', node_key, '-out', 
548                        node_req, '-days', days, '-subj', 
549                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
550                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
551                        '-set_serial', serial, '-req', '-in', node_req, 
552                        '-out', node_signed, '-days', days),
553                )
554            # Do all that stuff; bail if there's an error, and push all the
555            # output to dev/null.
556            for cmd in sequence:
557                trace = open("/dev/null", "w")
558                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
559                if rv != 0:
560                    raise service_error(service_error.internal, 
561                            "Cannot generate SEER certs.  %s return code %d" \
562                                    % (' '.join(cmd), rv))
563            # Concatinate the node key and signed certificate into node.pem
564            f = open(node_pem, 'w')
565            for comp in (node_signed, node_key):
566                g = open(comp, 'r')
567                f.write(g.read())
568                g.close()
569            f.close()
570
571            # Throw out intermediaries.
572            for fn in (ca_key, node_key, node_req, node_signed):
573                os.unlink(fn)
574
575        except EnvironmentError, e:
576            # Any difficulties with the file system wind up here
577            raise service_error(service_error.internal,
578                    "File error on  %s while creating SEER certs: %s" % \
579                            (e.filename, e.strerror))
580
581
582
583    def gentopo(self, str):
584        """
585        Generate the topology data structure from the splitter's XML
586        representation of it.
587
588        The topology XML looks like:
589            <experiment>
590                <nodes>
591                    <node><vname></vname><ips>ip1:ip2</ips></node>
592                </nodes>
593                <lans>
594                    <lan>
595                        <vname></vname><vnode></vnode><ip></ip>
596                        <bandwidth></bandwidth><member>node:port</member>
597                    </lan>
598                </lans>
599        """
600        class topo_parse:
601            """
602            Parse the topology XML and create the dats structure.
603            """
604            def __init__(self):
605                # Typing of the subelements for data conversion
606                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
607                self.int_subelements = ( 'bandwidth',)
608                self.float_subelements = ( 'delay',)
609                # The final data structure
610                self.nodes = [ ]
611                self.lans =  [ ]
612                self.topo = { \
613                        'node': self.nodes,\
614                        'lan' : self.lans,\
615                    }
616                self.element = { }  # Current element being created
617                self.chars = ""     # Last text seen
618
619            def end_element(self, name):
620                # After each sub element the contents is added to the current
621                # element or to the appropriate list.
622                if name == 'node':
623                    self.nodes.append(self.element)
624                    self.element = { }
625                elif name == 'lan':
626                    self.lans.append(self.element)
627                    self.element = { }
628                elif name in self.str_subelements:
629                    self.element[name] = self.chars
630                    self.chars = ""
631                elif name in self.int_subelements:
632                    self.element[name] = int(self.chars)
633                    self.chars = ""
634                elif name in self.float_subelements:
635                    self.element[name] = float(self.chars)
636                    self.chars = ""
637
638            def found_chars(self, data):
639                self.chars += data.rstrip()
640
641
642        tp = topo_parse();
643        parser = xml.parsers.expat.ParserCreate()
644        parser.EndElementHandler = tp.end_element
645        parser.CharacterDataHandler = tp.found_chars
646
647        parser.Parse(str)
648
649        return tp.topo
650       
651
652    def genviz(self, topo):
653        """
654        Generate the visualization the virtual topology
655        """
656
657        neato = "/usr/local/bin/neato"
658        # These are used to parse neato output and to create the visualization
659        # file.
660        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
661        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
662                "%s</type></node>"
663
664        try:
665            # Node names
666            nodes = [ n['vname'] for n in topo['node'] ]
667            topo_lans = topo['lan']
668        except KeyError, e:
669            raise service_error(service_error.internal, "Bad topology: %s" %e)
670
671        lans = { }
672        links = { }
673
674        # Walk through the virtual topology, organizing the connections into
675        # 2-node connections (links) and more-than-2-node connections (lans).
676        # When a lan is created, it's added to the list of nodes (there's a
677        # node in the visualization for the lan).
678        for l in topo_lans:
679            if links.has_key(l['vname']):
680                if len(links[l['vname']]) < 2:
681                    links[l['vname']].append(l['vnode'])
682                else:
683                    nodes.append(l['vname'])
684                    lans[l['vname']] = links[l['vname']]
685                    del links[l['vname']]
686                    lans[l['vname']].append(l['vnode'])
687            elif lans.has_key(l['vname']):
688                lans[l['vname']].append(l['vnode'])
689            else:
690                links[l['vname']] = [ l['vnode'] ]
691
692
693        # Open up a temporary file for dot to turn into a visualization
694        try:
695            df, dotname = tempfile.mkstemp()
696            dotfile = os.fdopen(df, 'w')
697        except EnvironmentError:
698            raise service_error(service_error.internal,
699                    "Failed to open file in genviz")
700
701        try:
702            dnull = open('/dev/null', 'w')
703        except EnvironmentError:
704            service_error(service_error.internal,
705                    "Failed to open /dev/null in genviz")
706
707        # Generate a dot/neato input file from the links, nodes and lans
708        try:
709            print >>dotfile, "graph G {"
710            for n in nodes:
711                print >>dotfile, '\t"%s"' % n
712            for l in links.keys():
713                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
714            for l in lans.keys():
715                for n in lans[l]:
716                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
717            print >>dotfile, "}"
718            dotfile.close()
719        except TypeError:
720            raise service_error(service_error.internal,
721                    "Single endpoint link in vtopo")
722        except EnvironmentError:
723            raise service_error(service_error.internal, "Cannot write dot file")
724
725        # Use dot to create a visualization
726        try:
727            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
728                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
729                stderr=dnull, close_fds=True)
730        except EnvironmentError:
731            raise service_error(service_error.internal, 
732                    "Cannot generate visualization: is graphviz available?")
733        dnull.close()
734
735        # Translate dot to vis format
736        vis_nodes = [ ]
737        vis = { 'node': vis_nodes }
738        for line in dot.stdout:
739            m = vis_re.match(line)
740            if m:
741                vn = m.group(1)
742                vis_node = {'name': vn, \
743                        'x': float(m.group(2)),\
744                        'y' : float(m.group(3)),\
745                    }
746                if vn in links.keys() or vn in lans.keys():
747                    vis_node['type'] = 'lan'
748                else:
749                    vis_node['type'] = 'node'
750                vis_nodes.append(vis_node)
751        rv = dot.wait()
752
753        os.remove(dotname)
754        # XXX: graphviz seems to use low return codes for warnings, like
755        # "couldn't find font"
756        if rv < 2 : return vis
757        else: return None
758
759
760    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
761            cert_pwd=None):
762        """
763        Release access to testbed through fedd
764        """
765
766        if not uri and tbmap:
767            uri = tbmap.get(tb, None)
768        if not uri:
769            raise service_error(service_error.server_config, 
770                    "Unknown testbed: %s" % tb)
771
772        if self.local_access.has_key(uri):
773            resp = self.local_access[uri].ReleaseAccess(\
774                    { 'ReleaseAccessRequestBody' : 
775                        {'allocID': {'fedid': aid}},}, 
776                    fedid(file=cert_file))
777            resp = { 'ReleaseAccessResponseBody': resp } 
778        else:
779            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
780                    cert_file, cert_pwd, self.trusted_certs)
781
782        # better error coding
783
784    def remote_ns2topdl(self, uri, desc):
785
786        req = {
787                'description' : { 'ns2description': desc },
788            }
789
790        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
791                self.trusted_certs)
792
793        if r.has_key('Ns2TopdlResponseBody'):
794            r = r['Ns2TopdlResponseBody']
795            ed = r.get('experimentdescription', None)
796            if ed.has_key('topdldescription'):
797                return topdl.Topology(**ed['topdldescription'])
798            else:
799                raise service_error(service_error.protocol, 
800                        "Bad splitter response (no output)")
801        else:
802            raise service_error(service_error.protocol, "Bad splitter response")
803
804    class start_segment:
805        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
806                cert_pwd=None, trusted_certs=None, caller=None,
807                log_collector=None):
808            self.log = log
809            self.debug = debug
810            self.cert_file = cert_file
811            self.cert_pwd = cert_pwd
812            self.trusted_certs = None
813            self.caller = caller
814            self.testbed = testbed
815            self.log_collector = log_collector
816            self.response = None
817            self.node = { }
818            self.proof = None
819
820        def make_map(self, resp):
821            if 'segmentdescription' not in resp  or \
822                    'topdldescription' not in resp['segmentdescription']:
823                self.log.warn('No topology returned from startsegment')
824                return 
825
826            top = topdl.Topology(
827                    **resp['segmentdescription']['topdldescription'])
828
829            for e in top.elements:
830                if isinstance(e, topdl.Computer):
831                    self.node[e.name] = e
832
833        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
834            req = {
835                    'allocID': { 'fedid' : aid }, 
836                    'segmentdescription': { 
837                        'topdldescription': topo.to_dict(),
838                    },
839                }
840
841            if connInfo:
842                req['connection'] = connInfo
843
844            import_svcs = [ s for m in masters.values() \
845                    for s in m if self.testbed in s.importers]
846
847            if import_svcs or self.testbed in masters:
848                req['service'] = []
849
850            for s in import_svcs:
851                for r in s.reqs:
852                    sr = copy.deepcopy(r)
853                    sr['visibility'] = 'import';
854                    req['service'].append(sr)
855
856            for s in masters.get(self.testbed, []):
857                for r in s.reqs:
858                    sr = copy.deepcopy(r)
859                    sr['visibility'] = 'export';
860                    req['service'].append(sr)
861
862            if attrs:
863                req['fedAttr'] = attrs
864
865            try:
866                self.log.debug("Calling StartSegment at %s " % uri)
867                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
868                        self.trusted_certs)
869                if r.has_key('StartSegmentResponseBody'):
870                    lval = r['StartSegmentResponseBody'].get('allocationLog',
871                            None)
872                    if lval and self.log_collector:
873                        for line in  lval.splitlines(True):
874                            self.log_collector.write(line)
875                    self.make_map(r['StartSegmentResponseBody'])
876                    if 'proof' in r: self.proof = r['proof']
877                    self.response = r
878                else:
879                    raise service_error(service_error.internal, 
880                            "Bad response!?: %s" %r)
881                return True
882            except service_error, e:
883                self.log.error("Start segment failed on %s: %s" % \
884                        (self.testbed, e))
885                return False
886
887
888
889    class terminate_segment:
890        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
891                cert_pwd=None, trusted_certs=None, caller=None):
892            self.log = log
893            self.debug = debug
894            self.cert_file = cert_file
895            self.cert_pwd = cert_pwd
896            self.trusted_certs = None
897            self.caller = caller
898            self.testbed = testbed
899
900        def __call__(self, uri, aid ):
901            req = {
902                    'allocID': {'fedid': aid }, 
903                }
904            try:
905                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
906                        self.trusted_certs)
907                return True
908            except service_error, e:
909                self.log.error("Terminate segment failed on %s: %s" % \
910                        (self.testbed, e))
911                return False
912
913    class info_segment(start_segment):
914        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
915                cert_pwd=None, trusted_certs=None, caller=None,
916                log_collector=None):
917            experiment_control_local.start_segment.__init__(self, debug, 
918                    log, testbed, cert_file, cert_pwd, trusted_certs, 
919                    caller, log_collector)
920
921        def __call__(self, uri, aid):
922            req = { 'allocID': { 'fedid' : aid } }
923
924            try:
925                self.log.debug("Calling InfoSegment at %s " % uri)
926                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
927                        self.trusted_certs)
928                if r.has_key('InfoSegmentResponseBody'):
929                    self.make_map(r['InfoSegmentResponseBody'])
930                    if 'proof' in r: self.proof = r['proof']
931                    self.response = r
932                else:
933                    raise service_error(service_error.internal, 
934                            "Bad response!?: %s" %r)
935                return True
936            except service_error, e:
937                self.log.error("Info segment failed on %s: %s" % \
938                        (self.testbed, e))
939                return False
940
941    class operation_segment:
942        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
943                cert_pwd=None, trusted_certs=None, caller=None,
944                log_collector=None):
945            self.log = log
946            self.debug = debug
947            self.cert_file = cert_file
948            self.cert_pwd = cert_pwd
949            self.trusted_certs = None
950            self.caller = caller
951            self.testbed = testbed
952            self.status = None
953
954        def __call__(self, uri, aid, op, targets, params):
955            req = { 
956                    'allocID': { 'fedid' : aid },
957                    'operation': op,
958                    'target': targets,
959                    }
960            if params: req['parameter'] = params
961
962
963            try:
964                self.log.debug("Calling OperationSegment at %s " % uri)
965                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
966                        self.trusted_certs)
967                if 'OperationSegmentResponseBody' in r:
968                    r = r['OperationSegmentResponseBody']
969                    if 'status' in r:
970                        self.status = r['status']
971                else:
972                    raise service_error(service_error.internal, 
973                            "Bad response!?: %s" %r)
974                return True
975            except service_error, e:
976                self.log.error("Operation segment failed on %s: %s" % \
977                        (self.testbed, e))
978                return False
979
980    def annotate_topology(self, top, data):
981        def add_new(ann, attr):
982            for a in ann:
983                if a not in attr: attr.append(a)
984
985        # Annotate the topology with embedding info
986        for e in top.elements:
987            if isinstance(e, topdl.Computer):
988                for s in data:
989                    ann = s.node.get(e.name, None)
990                    if ann is not None:
991                        add_new(ann.localname, e.localname)
992                        e.status = ann.status
993                        add_new(ann.service, e.service)
994                        add_new(ann.operation, e.operation)
995                        if ann.os:
996                            e.os = ann.os
997                        break
998
999
1000
1001    def allocate_resources(self, allocated, masters, eid, expid, 
1002            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1003            attrs=None, connInfo={}, tbmap=None, expcert=None):
1004
1005        started = { }           # Testbeds where a sub-experiment started
1006                                # successfully
1007
1008        # XXX
1009        fail_soft = False
1010
1011        if tbmap is None: tbmap = { }
1012
1013        log = alloc_log or self.log
1014
1015        tp = thread_pool(self.nthreads)
1016        threads = [ ]
1017        starters = [ ]
1018
1019        if expcert:
1020            cert = expcert
1021            pw = None
1022        else:
1023            cert = self.cert_file
1024            pw = self.cert_pwd
1025
1026        for tb in allocated.keys():
1027            # Create and start a thread to start the segment, and save it
1028            # to get the return value later
1029            tb_attrs = copy.copy(attrs)
1030            tp.wait_for_slot()
1031            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1032            base, suffix = split_testbed(tb)
1033            if suffix:
1034                tb_attrs.append({'attribute': 'experiment_name', 
1035                    'value': "%s-%s" % (eid, suffix)})
1036            else:
1037                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1038            if not uri:
1039                raise service_error(service_error.internal, 
1040                        "Unknown testbed %s !?" % tb)
1041
1042            aid = tbparams[tb].allocID
1043            if not aid:
1044                raise service_error(service_error.internal, 
1045                        "No alloc id for testbed %s !?" % tb)
1046
1047            s = self.start_segment(log=log, debug=self.debug,
1048                    testbed=tb, cert_file=cert,
1049                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1050                    caller=self.call_StartSegment,
1051                    log_collector=log_collector)
1052            starters.append(s)
1053            t  = pooled_thread(\
1054                    target=s, name=tb,
1055                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1056                    pdata=tp, trace_file=self.trace_file)
1057            threads.append(t)
1058            t.start()
1059
1060        # Wait until all finish (keep pinging the log, though)
1061        mins = 0
1062        revoked = False
1063        while not tp.wait_for_all_done(60.0):
1064            mins += 1
1065            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1066                    % mins)
1067            if not revoked and \
1068                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1069                # a testbed has failed.  Revoke this experiment's
1070                # synchronizarion values so that sub experiments will not
1071                # deadlock waiting for synchronization that will never happen
1072                self.log.info("A subexperiment has failed to swap in, " + \
1073                        "revoking synch keys")
1074                var_key = "fedid:%s" % expid
1075                for k in self.synch_store.all_keys():
1076                    if len(k) > 45 and k[0:46] == var_key:
1077                        self.synch_store.revoke_key(k)
1078                revoked = True
1079
1080        failed = [ t.getName() for t in threads if not t.rv ]
1081        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1082
1083        # If one failed clean up, unless fail_soft is set
1084        if failed:
1085            if not fail_soft:
1086                tp.clear()
1087                for tb in succeeded:
1088                    # Create and start a thread to stop the segment
1089                    tp.wait_for_slot()
1090                    uri = tbparams[tb].uri
1091                    t  = pooled_thread(\
1092                            target=self.terminate_segment(log=log,
1093                                testbed=tb,
1094                                cert_file=cert, 
1095                                cert_pwd=pw,
1096                                trusted_certs=self.trusted_certs,
1097                                caller=self.call_TerminateSegment),
1098                            args=(uri, tbparams[tb].allocID),
1099                            name=tb,
1100                            pdata=tp, trace_file=self.trace_file)
1101                    t.start()
1102                # Wait until all finish (if any are being stopped)
1103                if succeeded:
1104                    tp.wait_for_all_done()
1105
1106                # release the allocations
1107                for tb in tbparams.keys():
1108                    try:
1109                        self.release_access(tb, tbparams[tb].allocID, 
1110                                tbmap=tbmap, uri=tbparams[tb].uri,
1111                                cert_file=cert, cert_pwd=pw)
1112                    except service_error, e:
1113                        self.log.warn("Error releasing access: %s" % e.desc)
1114                # Remove the placeholder
1115                self.state_lock.acquire()
1116                self.state[eid].status = 'failed'
1117                self.state[eid].updated()
1118                if self.state_filename: self.write_state()
1119                self.state_lock.release()
1120                # Remove the repo dir
1121                self.remove_dirs("%s/%s" %(self.repodir, expid))
1122                # Walk up tmpdir, deleting as we go
1123                if self.cleanup:
1124                    self.remove_dirs(tmpdir)
1125                else:
1126                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1127
1128
1129                log.error("Swap in failed on %s" % ",".join(failed))
1130                return
1131        else:
1132            # Walk through the successes and gather the proofs
1133            proofs = { }
1134            for s in starters:
1135                if s.proof:
1136                    proofs[s.testbed] = s.proof
1137            self.annotate_topology(top, starters)
1138            log.info("[start_segment]: Experiment %s active" % eid)
1139
1140
1141        # Walk up tmpdir, deleting as we go
1142        if self.cleanup:
1143            self.remove_dirs(tmpdir)
1144        else:
1145            log.debug("[start_experiment]: not removing %s" % tmpdir)
1146
1147        # Insert the experiment into our state and update the disk copy.
1148        self.state_lock.acquire()
1149        self.state[expid].status = 'active'
1150        self.state[eid] = self.state[expid]
1151        self.state[eid].top = top
1152        self.state[eid].updated()
1153        # Append startup proofs
1154        for f in self.state[eid].get_all_allocations():
1155            if f.tb in proofs:
1156                f.proof.append(proofs[f.tb])
1157
1158        if self.state_filename: self.write_state()
1159        self.state_lock.release()
1160        return
1161
1162
1163    def add_kit(self, e, kit):
1164        """
1165        Add a Software object created from the list of (install, location)
1166        tuples passed as kit  to the software attribute of an object e.  We
1167        do this enough to break out the code, but it's kind of a hack to
1168        avoid changing the old tuple rep.
1169        """
1170
1171        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1172
1173        if isinstance(e.software, list): e.software.extend(s)
1174        else: e.software = s
1175
1176    def append_experiment_authorization(self, expid, attrs, 
1177            need_state_lock=True):
1178        """
1179        Append the authorization information to system state
1180        """
1181
1182        for p, a in attrs:
1183            self.auth.set_attribute(p, a)
1184        self.auth.save()
1185
1186        if need_state_lock: self.state_lock.acquire()
1187        # XXX: really a no op?
1188        #self.state[expid]['auth'].update(attrs)
1189        if self.state_filename: self.write_state()
1190        if need_state_lock: self.state_lock.release()
1191
1192    def clear_experiment_authorization(self, expid, need_state_lock=True):
1193        """
1194        Attrs is a set of attribute principal pairs that need to be removed
1195        from the authenticator.  Remove them and save the authenticator.
1196        """
1197
1198        if need_state_lock: self.state_lock.acquire()
1199        # XXX: should be a no-op
1200        #if expid in self.state and 'auth' in self.state[expid]:
1201            #for p, a in self.state[expid]['auth']:
1202                #self.auth.unset_attribute(p, a)
1203            #self.state[expid]['auth'] = set()
1204        if self.state_filename: self.write_state()
1205        if need_state_lock: self.state_lock.release()
1206        self.auth.save()
1207
1208
1209    def create_experiment_state(self, fid, req, expid, expcert,
1210            state='starting'):
1211        """
1212        Create the initial entry in the experiment's state.  The expid and
1213        expcert are the experiment's fedid and certifacte that represents that
1214        ID, which are installed in the experiment state.  If the request
1215        includes a suggested local name that is used if possible.  If the local
1216        name is already taken by an experiment owned by this user that has
1217        failed, it is overwritten.  Otherwise new letters are added until a
1218        valid localname is found.  The generated local name is returned.
1219        """
1220
1221        if req.has_key('experimentID') and \
1222                req['experimentID'].has_key('localname'):
1223            overwrite = False
1224            eid = req['experimentID']['localname']
1225            # If there's an old failed experiment here with the same local name
1226            # and accessible by this user, we'll overwrite it, otherwise we'll
1227            # fall through and do the collision avoidance.
1228            old_expid = self.get_experiment_fedid(eid)
1229            if old_expid:
1230                users_experiment = True
1231                try:
1232                    self.check_experiment_access(fid, old_expid)
1233                except service_error, e:
1234                    if e.code == service_error.access: users_experiment = False
1235                    else: raise e
1236                if users_experiment:
1237                    self.state_lock.acquire()
1238                    status = self.state[eid].status
1239                    if status and status == 'failed':
1240                        # remove the old access attributes
1241                        self.clear_experiment_authorization(eid,
1242                                need_state_lock=False)
1243                        overwrite = True
1244                        del self.state[eid]
1245                        del self.state[old_expid]
1246                    self.state_lock.release()
1247                else:
1248                    self.log.info('Experiment %s exists, ' % eid + \
1249                            'but this user cannot access it')
1250            self.state_lock.acquire()
1251            while (self.state.has_key(eid) and not overwrite):
1252                eid += random.choice(string.ascii_letters)
1253            # Initial state
1254            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1255                    identity=expcert)
1256            self.state[expid] = self.state[eid]
1257            if self.state_filename: self.write_state()
1258            self.state_lock.release()
1259        else:
1260            eid = self.exp_stem
1261            for i in range(0,5):
1262                eid += random.choice(string.ascii_letters)
1263            self.state_lock.acquire()
1264            while (self.state.has_key(eid)):
1265                eid = self.exp_stem
1266                for i in range(0,5):
1267                    eid += random.choice(string.ascii_letters)
1268            # Initial state
1269            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1270                    identity=expcert)
1271            self.state[expid] = self.state[eid]
1272            if self.state_filename: self.write_state()
1273            self.state_lock.release()
1274
1275        # Let users touch the state.  Authorize this fid and the expid itself
1276        # to touch the experiment, as well as allowing th eoverrides.
1277        self.append_experiment_authorization(eid, 
1278                set([(fid, expid), (expid,expid)] + \
1279                        [ (o, expid) for o in self.overrides]))
1280
1281        return eid
1282
1283
1284    def allocate_ips_to_topo(self, top):
1285        """
1286        Add an ip4_address attribute to all the hosts in the topology, based on
1287        the shared substrates on which they sit.  An /etc/hosts file is also
1288        created and returned as a list of hostfiles entries.  We also return
1289        the allocator, because we may need to allocate IPs to portals
1290        (specifically DRAGON portals).
1291        """
1292        subs = sorted(top.substrates, 
1293                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1294                reverse=True)
1295        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1296        ifs = { }
1297        hosts = [ ]
1298
1299        for idx, s in enumerate(subs):
1300            net_size = len(s.interfaces)+2
1301
1302            a = ips.allocate(net_size)
1303            if a :
1304                base, num = a
1305                if num < net_size: 
1306                    raise service_error(service_error.internal,
1307                            "Allocator returned wrong number of IPs??")
1308            else:
1309                raise service_error(service_error.req, 
1310                        "Cannot allocate IP addresses")
1311            mask = ips.min_alloc
1312            while mask < net_size:
1313                mask *= 2
1314
1315            netmask = ((2**32-1) ^ (mask-1))
1316
1317            base += 1
1318            for i in s.interfaces:
1319                i.attribute.append(
1320                        topdl.Attribute('ip4_address', 
1321                            "%s" % ip_addr(base)))
1322                i.attribute.append(
1323                        topdl.Attribute('ip4_netmask', 
1324                            "%s" % ip_addr(int(netmask))))
1325
1326                hname = i.element.name
1327                if ifs.has_key(hname):
1328                    hosts.append("%s\t%s-%s %s-%d" % \
1329                            (ip_addr(base), hname, s.name, hname,
1330                                ifs[hname]))
1331                else:
1332                    ifs[hname] = 0
1333                    hosts.append("%s\t%s-%s %s-%d %s" % \
1334                            (ip_addr(base), hname, s.name, hname,
1335                                ifs[hname], hname))
1336
1337                ifs[hname] += 1
1338                base += 1
1339        return hosts, ips
1340
1341    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1342            tbparam, masters, tbmap, expid=None, expcert=None):
1343        for tb in testbeds:
1344            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1345                    expcert)
1346            allocated[tb] = 1
1347
1348    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1349            expcert=None):
1350        """
1351        Get access to testbed through fedd and set the parameters for that tb
1352        """
1353        def get_export_project(svcs):
1354            """
1355            Look through for the list of federated_service for this testbed
1356            objects for a project_export service, and extract the project
1357            parameter.
1358            """
1359
1360            pe = [s for s in svcs if s.name=='project_export']
1361            if len(pe) == 1:
1362                return pe[0].params.get('project', None)
1363            elif len(pe) == 0:
1364                return None
1365            else:
1366                raise service_error(service_error.req,
1367                        "More than one project export is not supported")
1368
1369        def add_services(svcs, type, slist, keys):
1370            """
1371            Add the given services to slist.  type is import or export.  Also
1372            add a mapping entry from the assigned id to the original service
1373            record.
1374            """
1375            for i, s in enumerate(svcs):
1376                idx = '%s%d' % (type, i)
1377                keys[idx] = s
1378                sr = {'id': idx, 'name': s.name, 'visibility': type }
1379                if s.params:
1380                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1381                            for k, v in s.params.items()]
1382                slist.append(sr)
1383
1384        uri = tbmap.get(testbed_base(tb), None)
1385        if not uri:
1386            raise service_error(service_error.server_config, 
1387                    "Unknown testbed: %s" % tb)
1388
1389        export_svcs = masters.get(tb,[])
1390        import_svcs = [ s for m in masters.values() \
1391                for s in m \
1392                    if tb in s.importers ]
1393
1394        export_project = get_export_project(export_svcs)
1395        # Compose the credential list so that IDs come before attributes
1396        creds = set()
1397        keys = set()
1398        certs = self.auth.get_creds_for_principal(fid)
1399        if expid:
1400            certs.update(self.auth.get_creds_for_principal(expid))
1401        for c in certs:
1402            keys.add(c.issuer_cert())
1403            creds.add(c.attribute_cert())
1404        creds = list(keys) + list(creds)
1405
1406        if expcert: cert, pw = expcert, None
1407        else: cert, pw = self.cert_file, self.cert_pw
1408
1409        # Request credentials
1410        req = {
1411                'abac_credential': creds,
1412            }
1413        # Make the service request from the services we're importing and
1414        # exporting.  Keep track of the export request ids so we can
1415        # collect the resulting info from the access response.
1416        e_keys = { }
1417        if import_svcs or export_svcs:
1418            slist = []
1419            add_services(import_svcs, 'import', slist, e_keys)
1420            add_services(export_svcs, 'export', slist, e_keys)
1421            req['service'] = slist
1422
1423        if self.local_access.has_key(uri):
1424            # Local access call
1425            req = { 'RequestAccessRequestBody' : req }
1426            r = self.local_access[uri].RequestAccess(req, 
1427                    fedid(file=self.cert_file))
1428            r = { 'RequestAccessResponseBody' : r }
1429        else:
1430            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1431
1432        if r.has_key('RequestAccessResponseBody'):
1433            # Through to here we have a valid response, not a fault.
1434            # Access denied is a fault, so something better or worse than
1435            # access denied has happened.
1436            r = r['RequestAccessResponseBody']
1437            self.log.debug("[get_access] Access granted")
1438        else:
1439            raise service_error(service_error.protocol,
1440                        "Bad proxy response")
1441        if 'proof' not in r:
1442            raise service_error(service_error.protocol,
1443                        "Bad access response (no access proof)")
1444
1445        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1446                tb=tb, uri=uri, proof=[r['proof']], 
1447                services=masters.get(tb, None))
1448
1449        # Collect the responses corresponding to the services this testbed
1450        # exports.  These will be the service requests that we will include in
1451        # the start segment requests (with appropriate visibility values) to
1452        # import and export the segments.
1453        for s in r.get('service', []):
1454            id = s.get('id', None)
1455            # Note that this attaches the response to the object in the masters
1456            # data structure.  (The e_keys index disappears when this fcn
1457            # returns)
1458            if id and id in e_keys:
1459                e_keys[id].reqs.append(s)
1460
1461        # Add attributes to parameter space.  We don't allow attributes to
1462        # overlay any parameters already installed.
1463        for a in r.get('fedAttr', []):
1464            try:
1465                if a['attribute']:
1466                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1467            except KeyError:
1468                self.log.error("Bad attribute in response: %s" % a)
1469
1470
1471    def split_topology(self, top, topo, testbeds):
1472        """
1473        Create the sub-topologies that are needed for experiment instantiation.
1474        """
1475        for tb in testbeds:
1476            topo[tb] = top.clone()
1477            # copy in for loop allows deletions from the original
1478            for e in [ e for e in topo[tb].elements]:
1479                etb = e.get_attribute('testbed')
1480                # NB: elements without a testbed attribute won't appear in any
1481                # sub topologies. 
1482                if not etb or etb != tb:
1483                    for i in e.interface:
1484                        for s in i.subs:
1485                            try:
1486                                s.interfaces.remove(i)
1487                            except ValueError:
1488                                raise service_error(service_error.internal,
1489                                        "Can't remove interface??")
1490                    topo[tb].elements.remove(e)
1491            topo[tb].make_indices()
1492
1493    def confirm_software(self, top):
1494        """
1495        Make sure that the software to be loaded in the topo is all available
1496        before we begin making access requests, etc.  This is a subset of
1497        wrangle_software.
1498        """
1499        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1500        pkgs.update([x.location for e in top.elements for x in e.software])
1501
1502        for pkg in pkgs:
1503            loc = pkg
1504
1505            scheme, host, path = urlparse(loc)[0:3]
1506            dest = os.path.basename(path)
1507            if not scheme:
1508                if not loc.startswith('/'):
1509                    loc = "/%s" % loc
1510                loc = "file://%s" %loc
1511            # NB: if scheme was found, loc == pkg
1512            try:
1513                u = urlopen(loc)
1514                u.close()
1515            except Exception, e:
1516                raise service_error(service_error.req, 
1517                        "Cannot open %s: %s" % (loc, e))
1518        return True
1519
1520    def wrangle_software(self, expid, top, topo, tbparams):
1521        """
1522        Copy software out to the repository directory, allocate permissions and
1523        rewrite the segment topologies to look for the software in local
1524        places.
1525        """
1526
1527        # Copy the rpms and tarfiles to a distribution directory from
1528        # which the federants can retrieve them
1529        linkpath = "%s/software" %  expid
1530        softdir ="%s/%s" % ( self.repodir, linkpath)
1531        softmap = { }
1532
1533        # self.fedkit and self.gateway kit are lists of tuples of
1534        # (install_location, download_location) this extracts the download
1535        # locations.
1536        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1537        pkgs.update([x.location for e in top.elements for x in e.software])
1538        try:
1539            os.makedirs(softdir)
1540        except EnvironmentError, e:
1541            raise service_error(
1542                    "Cannot create software directory: %s" % e)
1543        # The actual copying.  Everything's converted into a url for copying.
1544        auth_attrs = set()
1545        for pkg in pkgs:
1546            loc = pkg
1547
1548            scheme, host, path = urlparse(loc)[0:3]
1549            dest = os.path.basename(path)
1550            if not scheme:
1551                if not loc.startswith('/'):
1552                    loc = "/%s" % loc
1553                loc = "file://%s" %loc
1554            # NB: if scheme was found, loc == pkg
1555            try:
1556                u = urlopen(loc)
1557            except Exception, e:
1558                raise service_error(service_error.req, 
1559                        "Cannot open %s: %s" % (loc, e))
1560            try:
1561                f = open("%s/%s" % (softdir, dest) , "w")
1562                self.log.debug("Writing %s/%s" % (softdir,dest) )
1563                data = u.read(4096)
1564                while data:
1565                    f.write(data)
1566                    data = u.read(4096)
1567                f.close()
1568                u.close()
1569            except Exception, e:
1570                raise service_error(service_error.internal,
1571                        "Could not copy %s: %s" % (loc, e))
1572            path = re.sub("/tmp", "", linkpath)
1573            # XXX
1574            softmap[pkg] = \
1575                    "%s/%s/%s" %\
1576                    ( self.repo_url, path, dest)
1577
1578            # Allow the individual segments to access the software by assigning
1579            # an attribute to each testbed allocation that encodes the data to
1580            # be released.  This expression collects the data for each run of
1581            # the loop.
1582            auth_attrs.update([
1583                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1584                        for tb in tbparams.keys()])
1585
1586        self.append_experiment_authorization(expid, auth_attrs)
1587
1588        # Convert the software locations in the segments into the local
1589        # copies on this host
1590        for soft in [ s for tb in topo.values() \
1591                for e in tb.elements \
1592                    if getattr(e, 'software', False) \
1593                        for s in e.software ]:
1594            if softmap.has_key(soft.location):
1595                soft.location = softmap[soft.location]
1596
1597
1598    def new_experiment(self, req, fid):
1599        """
1600        The external interface to empty initial experiment creation called from
1601        the dispatcher.
1602
1603        Creates a working directory, splits the incoming description using the
1604        splitter script and parses out the avrious subsections using the
1605        lcasses above.  Once each sub-experiment is created, use pooled threads
1606        to instantiate them and start it all up.
1607        """
1608        req = req.get('NewRequestBody', None)
1609        if not req:
1610            raise service_error(service_error.req,
1611                    "Bad request format (no NewRequestBody)")
1612
1613        if self.auth.import_credentials(data_list=req.get('credential', [])):
1614            self.auth.save()
1615
1616        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1617                with_proof=True)
1618
1619        if not access_ok:
1620            raise service_error(service_error.access, "New access denied",
1621                    proof=[proof])
1622
1623        try:
1624            tmpdir = tempfile.mkdtemp(prefix="split-")
1625        except EnvironmentError:
1626            raise service_error(service_error.internal, "Cannot create tmp dir")
1627
1628        try:
1629            access_user = self.accessdb[fid]
1630        except KeyError:
1631            raise service_error(service_error.internal,
1632                    "Access map and authorizer out of sync in " + \
1633                            "new_experiment for fedid %s"  % fid)
1634
1635        # Generate an ID for the experiment (slice) and a certificate that the
1636        # allocator can use to prove they own it.  We'll ship it back through
1637        # the encrypted connection.  If the requester supplied one, use it.
1638        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1639            expcert = req['experimentAccess']['X509']
1640            expid = fedid(certstr=expcert)
1641            self.state_lock.acquire()
1642            if expid in self.state:
1643                self.state_lock.release()
1644                raise service_error(service_error.req, 
1645                        'fedid %s identifies an existing experiment' % expid)
1646            self.state_lock.release()
1647        else:
1648            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1649
1650        #now we're done with the tmpdir, and it should be empty
1651        if self.cleanup:
1652            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1653            os.rmdir(tmpdir)
1654        else:
1655            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1656
1657        eid = self.create_experiment_state(fid, req, expid, expcert, 
1658                state='empty')
1659
1660        rv = {
1661                'experimentID': [
1662                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1663                ],
1664                'experimentStatus': 'empty',
1665                'experimentAccess': { 'X509' : expcert },
1666                'proof': proof.to_dict(),
1667            }
1668
1669        return rv
1670
1671    # create_experiment sub-functions
1672
1673    @staticmethod
1674    def get_experiment_key(req, field='experimentID'):
1675        """
1676        Parse the experiment identifiers out of the request (the request body
1677        tag has been removed).  Specifically this pulls either the fedid or the
1678        localname out of the experimentID field.  A fedid is preferred.  If
1679        neither is present or the request does not contain the fields,
1680        service_errors are raised.
1681        """
1682        # Get the experiment access
1683        exp = req.get(field, None)
1684        if exp:
1685            if exp.has_key('fedid'):
1686                key = exp['fedid']
1687            elif exp.has_key('localname'):
1688                key = exp['localname']
1689            else:
1690                raise service_error(service_error.req, "Unknown lookup type")
1691        else:
1692            raise service_error(service_error.req, "No request?")
1693
1694        return key
1695
1696    def get_experiment_ids_and_start(self, key, tmpdir):
1697        """
1698        Get the experiment name, id and access certificate from the state, and
1699        set the experiment state to 'starting'.  returns a triple (fedid,
1700        localname, access_cert_file). The access_cert_file is a copy of the
1701        contents of the access certificate, created in the tempdir with
1702        restricted permissions.  If things are confused, raise an exception.
1703        """
1704
1705        expid = eid = None
1706        self.state_lock.acquire()
1707        if key in self.state:
1708            exp = self.state[key]
1709            exp.status = "starting"
1710            exp.updated()
1711            expid = exp.fedid
1712            eid = exp.localname
1713            expcert = exp.identity
1714        self.state_lock.release()
1715
1716        # make a protected copy of the access certificate so the experiment
1717        # controller can act as the experiment principal.
1718        if expcert:
1719            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1720            if not expcert_file:
1721                raise service_error(service_error.internal, 
1722                        "Cannot create temp cert file?")
1723        else:
1724            expcert_file = None
1725
1726        return (eid, expid, expcert_file)
1727
1728    def get_topology(self, req, tmpdir):
1729        """
1730        Get the ns2 content and put it into a file for parsing.  Call the local
1731        or remote parser and return the topdl.Topology.  Errors result in
1732        exceptions.  req is the request and tmpdir is a work directory.
1733        """
1734
1735        # The tcl parser needs to read a file so put the content into that file
1736        descr=req.get('experimentdescription', None)
1737        if descr:
1738            if 'ns2description' in descr:
1739                file_content=descr['ns2description']
1740            elif 'topdldescription' in descr:
1741                return topdl.Topology(**descr['topdldescription'])
1742            else:
1743                raise service_error(service_error.req, 
1744                        'Unknown experiment description type')
1745        else:
1746            raise service_error(service_error.req, "No experiment description")
1747
1748
1749        if self.splitter_url:
1750            self.log.debug("Calling remote topdl translator at %s" % \
1751                    self.splitter_url)
1752            top = self.remote_ns2topdl(self.splitter_url, file_content)
1753        else:
1754            tclfile = os.path.join(tmpdir, "experiment.tcl")
1755            if file_content:
1756                try:
1757                    f = open(tclfile, 'w')
1758                    f.write(file_content)
1759                    f.close()
1760                except EnvironmentError:
1761                    raise service_error(service_error.internal,
1762                            "Cannot write temp experiment description")
1763            else:
1764                raise service_error(service_error.req, 
1765                        "Only ns2descriptions supported")
1766            pid = "dummy"
1767            gid = "dummy"
1768            eid = "dummy"
1769
1770            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1771                str(self.muxmax), '-m', 'dummy']
1772
1773            tclcmd.extend([pid, gid, eid, tclfile])
1774
1775            self.log.debug("running local splitter %s", " ".join(tclcmd))
1776            # This is just fantastic.  As a side effect the parser copies
1777            # tb_compat.tcl into the current directory, so that directory
1778            # must be writable by the fedd user.  Doing this in the
1779            # temporary subdir ensures this is the case.
1780            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1781                    cwd=tmpdir)
1782            split_data = tclparser.stdout
1783
1784            top = topdl.topology_from_xml(file=split_data, top="experiment")
1785            os.remove(tclfile)
1786
1787        return top
1788
1789    def get_testbed_services(self, req, testbeds):
1790        """
1791        Parse the services section of the request into two dicts mapping
1792        testbed to lists of federated_service objects.  The first dict maps all
1793        exporters of services to those service objects, the second maps
1794        testbeds to service objects only for services requiring portals.
1795        """
1796        # We construct both dicts here because deriving the second is more
1797        # comples than it looks - both the keys and lists can differ, and it's
1798        # much easier to generate both in one pass.
1799        masters = { }
1800        pmasters = { }
1801        for s in req.get('service', []):
1802            # If this is a service request with the importall field
1803            # set, fill it out.
1804
1805            if s.get('importall', False):
1806                s['import'] = [ tb for tb in testbeds \
1807                        if tb not in s.get('export',[])]
1808                del s['importall']
1809
1810            # Add the service to masters
1811            for tb in s.get('export', []):
1812                if s.get('name', None):
1813
1814                    params = { }
1815                    for a in s.get('fedAttr', []):
1816                        params[a.get('attribute', '')] = a.get('value','')
1817
1818                    fser = federated_service(name=s['name'],
1819                            exporter=tb, importers=s.get('import',[]),
1820                            params=params)
1821                    if fser.name == 'hide_hosts' \
1822                            and 'hosts' not in fser.params:
1823                        fser.params['hosts'] = \
1824                                ",".join(tb_hosts.get(fser.exporter, []))
1825                    if tb in masters: masters[tb].append(fser)
1826                    else: masters[tb] = [fser]
1827
1828                    if fser.portal:
1829                        if tb in pmasters: pmasters[tb].append(fser)
1830                        else: pmasters[tb] = [fser]
1831                else:
1832                    self.log.error('Testbed service does not have name " + \
1833                            "and importers')
1834        return masters, pmasters
1835
1836    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1837        """
1838        Create the ssh keys necessary for interconnecting the portal nodes and
1839        the global hosts file for letting each segment know about the IP
1840        addresses in play.  Save these into the repo.  Add attributes to the
1841        autorizer allowing access controllers to download them and return a set
1842        of attributes that inform the segments where to find this stuff.  May
1843        raise service_errors in if there are problems.
1844        """
1845        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1846        gw_secretkey_base = "fed.%s" % self.ssh_type
1847        keydir = os.path.join(tmpdir, 'keys')
1848        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1849        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1850
1851        try:
1852            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1853        except ValueError:
1854            raise service_error(service_error.server_config, 
1855                    "Bad key type (%s)" % self.ssh_type)
1856
1857        self.generate_seer_certs(keydir)
1858
1859        # Copy configuration files into the remote file store
1860        # The config urlpath
1861        configpath = "/%s/config" % expid
1862        # The config file system location
1863        configdir ="%s%s" % ( self.repodir, configpath)
1864        try:
1865            os.makedirs(configdir)
1866        except EnvironmentError, e:
1867            raise service_error(service_error.internal,
1868                    "Cannot create config directory: %s" % e)
1869        try:
1870            f = open("%s/hosts" % configdir, "w")
1871            print >> f, string.join(hosts, '\n')
1872            f.close()
1873        except EnvironmentError, e:
1874            raise service_error(service_error.internal, 
1875                    "Cannot write hosts file: %s" % e)
1876        try:
1877            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1878            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1879            copy_file(os.path.join(keydir, 'ca.pem'), 
1880                    os.path.join(configdir, 'ca.pem'))
1881            copy_file(os.path.join(keydir, 'node.pem'), 
1882                    os.path.join(configdir, 'node.pem'))
1883        except EnvironmentError, e:
1884            raise service_error(service_error.internal, 
1885                    "Cannot copy keyfiles: %s" % e)
1886
1887        # Allow the individual testbeds to access the configuration files,
1888        # again by setting an attribute for the relevant pathnames on each
1889        # allocation principal.  Yeah, that's a long list comprehension.
1890        self.append_experiment_authorization(expid, set([
1891            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1892                    for tb in tbparams.keys() \
1893                        for f in ("hosts", 'ca.pem', 'node.pem', 
1894                            gw_secretkey_base, gw_pubkey_base)]))
1895
1896        attrs = [ 
1897                {
1898                    'attribute': 'ssh_pubkey', 
1899                    'value': '%s/%s/config/%s' % \
1900                            (self.repo_url, expid, gw_pubkey_base)
1901                },
1902                {
1903                    'attribute': 'ssh_secretkey', 
1904                    'value': '%s/%s/config/%s' % \
1905                            (self.repo_url, expid, gw_secretkey_base)
1906                },
1907                {
1908                    'attribute': 'hosts', 
1909                    'value': '%s/%s/config/hosts' % \
1910                            (self.repo_url, expid)
1911                },
1912                {
1913                    'attribute': 'seer_ca_pem', 
1914                    'value': '%s/%s/config/%s' % \
1915                            (self.repo_url, expid, 'ca.pem')
1916                },
1917                {
1918                    'attribute': 'seer_node_pem', 
1919                    'value': '%s/%s/config/%s' % \
1920                            (self.repo_url, expid, 'node.pem')
1921                },
1922            ]
1923        return attrs
1924
1925
1926    def get_vtopo(self, req, fid):
1927        """
1928        Return the stored virtual topology for this experiment
1929        """
1930        rv = None
1931        state = None
1932
1933        req = req.get('VtopoRequestBody', None)
1934        if not req:
1935            raise service_error(service_error.req,
1936                    "Bad request format (no VtopoRequestBody)")
1937        exp = req.get('experiment', None)
1938        if exp:
1939            if exp.has_key('fedid'):
1940                key = exp['fedid']
1941                keytype = "fedid"
1942            elif exp.has_key('localname'):
1943                key = exp['localname']
1944                keytype = "localname"
1945            else:
1946                raise service_error(service_error.req, "Unknown lookup type")
1947        else:
1948            raise service_error(service_error.req, "No request?")
1949
1950        proof = self.check_experiment_access(fid, key)
1951
1952        self.state_lock.acquire()
1953        # XXX: this needs to be recalculated
1954        if key in self.state:
1955            if self.state[key].top is not None:
1956                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1957                rv = { 'experiment' : {keytype: key },
1958                        'vtopo': vtopo,
1959                        'proof': proof.to_dict(), 
1960                    }
1961            else:
1962                state = self.state[key].status
1963        self.state_lock.release()
1964
1965        if rv: return rv
1966        else: 
1967            if state:
1968                raise service_error(service_error.partial, 
1969                        "Not ready: %s" % state)
1970            else:
1971                raise service_error(service_error.req, "No such experiment")
1972
1973    def get_vis(self, req, fid):
1974        """
1975        Return the stored visualization for this experiment
1976        """
1977        rv = None
1978        state = None
1979
1980        req = req.get('VisRequestBody', None)
1981        if not req:
1982            raise service_error(service_error.req,
1983                    "Bad request format (no VisRequestBody)")
1984        exp = req.get('experiment', None)
1985        if exp:
1986            if exp.has_key('fedid'):
1987                key = exp['fedid']
1988                keytype = "fedid"
1989            elif exp.has_key('localname'):
1990                key = exp['localname']
1991                keytype = "localname"
1992            else:
1993                raise service_error(service_error.req, "Unknown lookup type")
1994        else:
1995            raise service_error(service_error.req, "No request?")
1996
1997        proof = self.check_experiment_access(fid, key)
1998
1999        self.state_lock.acquire()
2000        # Generate the visualization
2001        if key in self.state:
2002            if self.state[key].top is not None:
2003                try:
2004                    vis = self.genviz(
2005                            topdl.topology_to_vtopo(self.state[key].topo))
2006                except service_error, e:
2007                    self.state_lock.release()
2008                    raise e
2009                rv =  { 'experiment' : {keytype: key },
2010                        'vis': vis,
2011                        'proof': proof.to_dict(), 
2012                        }
2013            else:
2014                state = self.state[key].status
2015        self.state_lock.release()
2016
2017        if rv: return rv
2018        else:
2019            if state:
2020                raise service_error(service_error.partial, 
2021                        "Not ready: %s" % state)
2022            else:
2023                raise service_error(service_error.req, "No such experiment")
2024
2025   
2026    def save_federant_information(self, allocated, tbparams, eid, top):
2027        """
2028        Store the various data that have changed in the experiment state
2029        between when it was started and the beginning of resource allocation.
2030        This is basically the information about each local allocation.  This
2031        fills in the values of the placeholder allocation in the state.  It
2032        also collects the access proofs and returns them as dicts for a
2033        response message.
2034        """
2035        self.state_lock.acquire()
2036        exp = self.state[eid]
2037        exp.top = top.clone()
2038        # save federant information
2039        for k in allocated.keys():
2040            exp.add_allocation(tbparams[k])
2041            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2042                type="testbed", localname=[k], 
2043                service=[ s.to_topdl() for s in tbparams[k].services]))
2044
2045        # Access proofs for the response message
2046        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2047                    for p in tbparams[k].proof]
2048        exp.updated()
2049        if self.state_filename: 
2050            self.write_state()
2051        self.state_lock.release()
2052        return proofs
2053
2054    def clear_placeholder(self, eid, expid, tmpdir):
2055        """
2056        Clear the placeholder and remove any allocated temporary dir.
2057        """
2058
2059        self.state_lock.acquire()
2060        del self.state[eid]
2061        del self.state[expid]
2062        if self.state_filename: self.write_state()
2063        self.state_lock.release()
2064        if tmpdir and self.cleanup:
2065            self.remove_dirs(tmpdir)
2066
2067    # end of create_experiment sub-functions
2068
2069    def create_experiment(self, req, fid):
2070        """
2071        The external interface to experiment creation called from the
2072        dispatcher.
2073
2074        Creates a working directory, splits the incoming description using the
2075        splitter script and parses out the various subsections using the
2076        classes above.  Once each sub-experiment is created, use pooled threads
2077        to instantiate them and start it all up.
2078        """
2079
2080        req = req.get('CreateRequestBody', None)
2081        if req:
2082            key = self.get_experiment_key(req)
2083        else:
2084            raise service_error(service_error.req,
2085                    "Bad request format (no CreateRequestBody)")
2086
2087        # Import information from the requester
2088        if self.auth.import_credentials(data_list=req.get('credential', [])):
2089            self.auth.save()
2090
2091        # Make sure that the caller can talk to us
2092        proof = self.check_experiment_access(fid, key)
2093
2094        # Install the testbed map entries supplied with the request into a copy
2095        # of the testbed map.
2096        tbmap = dict(self.tbmap)
2097        for m in req.get('testbedmap', []):
2098            if 'testbed' in m and 'uri' in m:
2099                tbmap[m['testbed']] = m['uri']
2100
2101        # a place to work
2102        try:
2103            tmpdir = tempfile.mkdtemp(prefix="split-")
2104            os.mkdir(tmpdir+"/keys")
2105        except EnvironmentError:
2106            raise service_error(service_error.internal, "Cannot create tmp dir")
2107
2108        tbparams = { }
2109
2110        eid, expid, expcert_file = \
2111                self.get_experiment_ids_and_start(key, tmpdir)
2112
2113        # This catches exceptions to clear the placeholder if necessary
2114        try: 
2115            if not (eid and expid):
2116                raise service_error(service_error.internal, 
2117                        "Cannot find local experiment info!?")
2118
2119            top = self.get_topology(req, tmpdir)
2120            self.confirm_software(top)
2121            # Assign the IPs
2122            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2123            # Find the testbeds to look up
2124            tb_hosts = { }
2125            testbeds = [ ]
2126            for e in top.elements:
2127                if isinstance(e, topdl.Computer):
2128                    tb = e.get_attribute('testbed') or 'default'
2129                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2130                    else: 
2131                        tb_hosts[tb] = [ e.name ]
2132                        testbeds.append(tb)
2133
2134            masters, pmasters = self.get_testbed_services(req, testbeds)
2135            allocated = { }         # Testbeds we can access
2136            topo ={ }               # Sub topologies
2137            connInfo = { }          # Connection information
2138
2139            self.split_topology(top, topo, testbeds)
2140
2141            self.get_access_to_testbeds(testbeds, fid, allocated, 
2142                    tbparams, masters, tbmap, expid, expcert_file)
2143
2144            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2145
2146            part = experiment_partition(self.auth, self.store_url, tbmap,
2147                    self.muxmax, self.direct_transit)
2148            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2149                    connInfo, expid)
2150
2151            auth_attrs = set()
2152            # Now get access to the dynamic testbeds (those added above)
2153            for tb in [ t for t in topo if t not in allocated]:
2154                self.get_access(tb, tbparams, fid, masters, tbmap, 
2155                        expid, expcert_file)
2156                allocated[tb] = 1
2157                store_keys = topo[tb].get_attribute('store_keys')
2158                # Give the testbed access to keys it exports or imports
2159                if store_keys:
2160                    auth_attrs.update(set([
2161                        (tbparams[tb].allocID, sk) \
2162                                for sk in store_keys.split(" ")]))
2163
2164            if auth_attrs:
2165                self.append_experiment_authorization(expid, auth_attrs)
2166
2167            # transit and disconnected testbeds may not have a connInfo entry.
2168            # Fill in the blanks.
2169            for t in allocated.keys():
2170                if not connInfo.has_key(t):
2171                    connInfo[t] = { }
2172
2173            self.wrangle_software(expid, top, topo, tbparams)
2174
2175            proofs = self.save_federant_information(allocated, tbparams, 
2176                    eid, top)
2177        except service_error, e:
2178            # If something goes wrong in the parse (usually an access error)
2179            # clear the placeholder state.  From here on out the code delays
2180            # exceptions.  Failing at this point returns a fault to the remote
2181            # caller.
2182            self.clear_placeholder(eid, expid, tmpdir)
2183            raise e
2184
2185        # Start the background swapper and return the starting state.  From
2186        # here on out, the state will stick around a while.
2187
2188        # Create a logger that logs to the experiment's state object as well as
2189        # to the main log file.
2190        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2191        alloc_collector = self.list_log(self.state[eid].log)
2192        h = logging.StreamHandler(alloc_collector)
2193        # XXX: there should be a global one of these rather than repeating the
2194        # code.
2195        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2196                    '%d %b %y %H:%M:%S'))
2197        alloc_log.addHandler(h)
2198
2199        # Start a thread to do the resource allocation
2200        t  = Thread(target=self.allocate_resources,
2201                args=(allocated, masters, eid, expid, tbparams, 
2202                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2203                    connInfo, tbmap, expcert_file),
2204                name=eid)
2205        t.start()
2206
2207        rv = {
2208                'experimentID': [
2209                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2210                ],
2211                'experimentStatus': 'starting',
2212                'proof': [ proof.to_dict() ] + proofs,
2213            }
2214
2215        return rv
2216   
2217    def get_experiment_fedid(self, key):
2218        """
2219        find the fedid associated with the localname key in the state database.
2220        """
2221
2222        rv = None
2223        self.state_lock.acquire()
2224        if key in self.state:
2225            rv = self.state[key].fedid
2226        self.state_lock.release()
2227        return rv
2228
2229    def check_experiment_access(self, fid, key):
2230        """
2231        Confirm that the fid has access to the experiment.  Though a request
2232        may be made in terms of a local name, the access attribute is always
2233        the experiment's fedid.
2234        """
2235        if not isinstance(key, fedid):
2236            key = self.get_experiment_fedid(key)
2237
2238        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2239
2240        if access_ok:
2241            return proof
2242        else:
2243            raise service_error(service_error.access, "Access Denied",
2244                proof)
2245
2246
2247    def get_handler(self, path, fid):
2248        """
2249        Perhaps surprisingly named, this function handles HTTP GET requests to
2250        this server (SOAP requests are POSTs).
2251        """
2252        self.log.info("Get handler %s %s" % (path, fid))
2253        # XXX: log proofs?
2254        if self.auth.check_attribute(fid, path):
2255            return ("%s/%s" % (self.repodir, path), "application/binary")
2256        else:
2257            return (None, None)
2258
2259    def update_info(self, key, force=False):
2260        top = None
2261        self.state_lock.acquire()
2262        if key in self.state:
2263            if force or self.state[key].older_than(self.info_cache_limit):
2264                top = self.state[key].top
2265                if top is not None: top = top.clone()
2266                d1, info_params, cert, d2 = \
2267                        self.get_segment_info(self.state[key], need_lock=False)
2268        self.state_lock.release()
2269
2270        if top is None: return
2271
2272        try:
2273            tmpdir = tempfile.mkdtemp(prefix="info-")
2274        except EnvironmentError:
2275            raise service_error(service_error.internal, 
2276                    "Cannot create tmp dir")
2277        cert_file = self.make_temp_certfile(cert, tmpdir)
2278
2279        data = []
2280        try:
2281            for k, (uri, aid) in info_params.items():
2282                info=self.info_segment(log=self.log, testbed=uri,
2283                            cert_file=cert_file, cert_pwd=None,
2284                            trusted_certs=self.trusted_certs,
2285                            caller=self.call_InfoSegment)
2286                info(uri, aid)
2287                data.append(info)
2288        # Clean up the tmpdir no matter what
2289        finally:
2290            if tmpdir: self.remove_dirs(tmpdir)
2291
2292        self.annotate_topology(top, data)
2293        self.state_lock.acquire()
2294        if key in self.state:
2295            self.state[key].top = top
2296            self.state[key].updated()
2297            if self.state_filename: self.write_state()
2298        self.state_lock.release()
2299
2300   
2301    def get_info(self, req, fid):
2302        """
2303        Return all the stored info about this experiment
2304        """
2305        rv = None
2306
2307        req = req.get('InfoRequestBody', None)
2308        if not req:
2309            raise service_error(service_error.req,
2310                    "Bad request format (no InfoRequestBody)")
2311        exp = req.get('experiment', None)
2312        legacy = req.get('legacy', False)
2313        fresh = req.get('fresh', False)
2314        if exp:
2315            if exp.has_key('fedid'):
2316                key = exp['fedid']
2317                keytype = "fedid"
2318            elif exp.has_key('localname'):
2319                key = exp['localname']
2320                keytype = "localname"
2321            else:
2322                raise service_error(service_error.req, "Unknown lookup type")
2323        else:
2324            raise service_error(service_error.req, "No request?")
2325
2326        proof = self.check_experiment_access(fid, key)
2327
2328        self.update_info(key, fresh)
2329
2330        self.state_lock.acquire()
2331        if self.state.has_key(key):
2332            rv = self.state[key].get_info()
2333            # Copy the topo if we need legacy annotations
2334            if legacy:
2335                top = self.state[key].top
2336                if top is not None: top = top.clone()
2337        self.state_lock.release()
2338
2339        # If the legacy visualization and topology representations are
2340        # requested, calculate them and add them to the return.
2341        if legacy and rv is not None:
2342            if top is not None:
2343                vtopo = topdl.topology_to_vtopo(top)
2344                if vtopo is not None:
2345                    rv['vtopo'] = vtopo
2346                    try:
2347                        vis = self.genviz(vtopo)
2348                    except service_error, e:
2349                        self.log.debug('Problem generating visualization: %s' \
2350                                % e)
2351                        vis = None
2352                    if vis is not None:
2353                        rv['vis'] = vis
2354        if rv:
2355            rv['proof'] = proof.to_dict()
2356            return rv
2357        else:
2358            raise service_error(service_error.req, "No such experiment")
2359
2360    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2361            results):
2362        """
2363        Call OperateSegment on multiple testbeds and gather the results.
2364        op_params contains the parameters needed to contact that testbed, cert
2365        is a certificate containing the fedid to use, op is the operation,
2366        testbeds is a dict mapping testbed name to targets in that testbed,
2367        params are the parameters to include a,d results is a growing list of
2368        the results of the calls.
2369        """
2370        try:
2371            tmpdir = tempfile.mkdtemp(prefix="info-")
2372        except EnvironmentError:
2373            raise service_error(service_error.internal, 
2374                    "Cannot create tmp dir")
2375        cert_file = self.make_temp_certfile(cert, tmpdir)
2376
2377        try:
2378            for tb, targets in testbeds.items():
2379                if tb in op_params:
2380                    uri, aid = op_params[tb]
2381                    operate=self.operation_segment(log=self.log, testbed=uri,
2382                                cert_file=cert_file, cert_pwd=None,
2383                                trusted_certs=self.trusted_certs,
2384                                caller=self.call_OperationSegment)
2385                    if operate(uri, aid, op, targets, params):
2386                        if operate.status is not None:
2387                            results.extend(operate.status)
2388                            continue
2389                # Something went wrong in a weird way.  Add statuses
2390                # that reflect that to results
2391                for t in targets:
2392                    results.append(operation_status(t, 
2393                        operation_status.federant,
2394                        'Unexpected error on %s' % tb))
2395        # Clean up the tmpdir no matter what
2396        finally:
2397            if tmpdir: self.remove_dirs(tmpdir)
2398
2399    def do_operation(self, req, fid):
2400        """
2401        Find the testbeds holding each target and ask them to carry out the
2402        operation.  Return the statuses.
2403        """
2404        # Map an element to the testbed containing it
2405        def element_to_tb(e):
2406            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2407            elif isinstance(e, topdl.Testbed): return e.name
2408            else: return None
2409        # If d is an operation_status object, make it a dict
2410        def make_dict(d):
2411            if isinstance(d, dict): return d
2412            elif isinstance(d, operation_status): return d.to_dict()
2413            else: return { }
2414
2415        def element_name(e):
2416            if isinstance(e, topdl.Computer): return e.name
2417            elif isinstance(e, topdl.Testbed): 
2418                if e.localname: return e.localname[0]
2419                else: return None
2420            else: return None
2421
2422        req = req.get('OperationRequestBody', None)
2423        if not req:
2424            raise service_error(service_error.req,
2425                    "Bad request format (no OperationRequestBody)")
2426        exp = req.get('experiment', None)
2427        op = req.get('operation', None)
2428        targets = set(req.get('target', []))
2429        params = req.get('parameter', None)
2430
2431        if exp:
2432            if 'fedid' in exp:
2433                key = exp['fedid']
2434                keytype = "fedid"
2435            elif 'localname' in exp:
2436                key = exp['localname']
2437                keytype = "localname"
2438            else:
2439                raise service_error(service_error.req, "Unknown lookup type")
2440        else:
2441            raise service_error(service_error.req, "No request?")
2442
2443        if op is None or not targets:
2444            raise service_error(service_error.req, "No request?")
2445
2446        proof = self.check_experiment_access(fid, key)
2447        self.state_lock.acquire()
2448        if key in self.state:
2449            d1, op_params, cert, d2 = \
2450                    self.get_segment_info(self.state[key], need_lock=False,
2451                            key='tb')
2452            top = self.state[key].top
2453            if top is not None:
2454                top = top.clone()
2455        self.state_lock.release()
2456
2457        if top is None:
2458            raise service_error(service_error.partial, "No topology yet", 
2459                    proof=proof)
2460
2461        testbeds = { }
2462        results = []
2463        for e in top.elements:
2464            ename = element_name(e)
2465            if ename in targets:
2466                tb = element_to_tb(e)
2467                targets.remove(ename)
2468                if tb is not None:
2469                    if tb in testbeds: testbeds[tb].append(ename)
2470                    else: testbeds[tb] = [ ename ]
2471                else:
2472                    results.append(operation_status(e.name, 
2473                        code=operation_status.no_target, 
2474                        description='Cannot map target to testbed'))
2475
2476        for t in targets:
2477            results.append(operation_status(t, operation_status.no_target))
2478
2479        self.operate_on_segments(op_params, cert, op, testbeds, params,
2480                results)
2481
2482        return { 
2483                'experiment': exp, 
2484                'status': [make_dict(r) for r in results],
2485                'proof': proof.to_dict()
2486                }
2487
2488
2489    def get_multi_info(self, req, fid):
2490        """
2491        Return all the stored info that this fedid can access
2492        """
2493        rv = { 'info': [ ], 'proof': [ ] }
2494
2495        self.state_lock.acquire()
2496        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2497            try:
2498                proof = self.check_experiment_access(fid, key)
2499            except service_error, e:
2500                if e.code == service_error.access:
2501                    continue
2502                else:
2503                    self.state_lock.release()
2504                    raise e
2505
2506            if self.state.has_key(key):
2507                e = self.state[key].get_info()
2508                e['proof'] = proof.to_dict()
2509                rv['info'].append(e)
2510                rv['proof'].append(proof.to_dict())
2511        self.state_lock.release()
2512        return rv
2513
2514    def check_termination_status(self, fed_exp, force):
2515        """
2516        Confirm that the experiment is sin a valid state to stop (or force it)
2517        return the state - invalid states for deletion and force settings cause
2518        exceptions.
2519        """
2520        self.state_lock.acquire()
2521        status = fed_exp.status
2522
2523        if status:
2524            if status in ('starting', 'terminating'):
2525                if not force:
2526                    self.state_lock.release()
2527                    raise service_error(service_error.partial, 
2528                            'Experiment still being created or destroyed')
2529                else:
2530                    self.log.warning('Experiment in %s state ' % status + \
2531                            'being terminated by force.')
2532            self.state_lock.release()
2533            return status
2534        else:
2535            # No status??? trouble
2536            self.state_lock.release()
2537            raise service_error(service_error.internal,
2538                    "Experiment has no status!?")
2539
2540    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2541        ids = []
2542        term_params = { }
2543        if need_lock: self.state_lock.acquire()
2544        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2545        expcert = fed_exp.identity
2546        repo = "%s" % fed_exp.fedid
2547
2548        # Collect the allocation/segment ids into a dict keyed by the fedid
2549        # of the allocation that contains a tuple of uri, aid
2550        for i, fed in enumerate(fed_exp.get_all_allocations()):
2551            uri = fed.uri
2552            aid = fed.allocID
2553            if key == 'aid': term_params[aid] = (uri, aid)
2554            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2555
2556        if need_lock: self.state_lock.release()
2557        return ids, term_params, expcert, repo
2558
2559
2560    def get_termination_info(self, fed_exp):
2561        self.state_lock.acquire()
2562        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2563        # Change the experiment state
2564        fed_exp.status = 'terminating'
2565        fed_exp.updated()
2566        if self.state_filename: self.write_state()
2567        self.state_lock.release()
2568
2569        return ids, term_params, expcert, repo
2570
2571
2572    def deallocate_resources(self, term_params, expcert, status, force, 
2573            dealloc_log):
2574        tmpdir = None
2575        # This try block makes sure the tempdir is cleared
2576        try:
2577            # If no expcert, try the deallocation as the experiment
2578            # controller instance.
2579            if expcert and self.auth_type != 'legacy': 
2580                try:
2581                    tmpdir = tempfile.mkdtemp(prefix="term-")
2582                except EnvironmentError:
2583                    raise service_error(service_error.internal, 
2584                            "Cannot create tmp dir")
2585                cert_file = self.make_temp_certfile(expcert, tmpdir)
2586                pw = None
2587            else: 
2588                cert_file = self.cert_file
2589                pw = self.cert_pwd
2590
2591            # Stop everyone.  NB, wait_for_all waits until a thread starts
2592            # and then completes, so we can't wait if nothing starts.  So,
2593            # no tbparams, no start.
2594            if len(term_params) > 0:
2595                tp = thread_pool(self.nthreads)
2596                for k, (uri, aid) in term_params.items():
2597                    # Create and start a thread to stop the segment
2598                    tp.wait_for_slot()
2599                    t  = pooled_thread(\
2600                            target=self.terminate_segment(log=dealloc_log,
2601                                testbed=uri,
2602                                cert_file=cert_file, 
2603                                cert_pwd=pw,
2604                                trusted_certs=self.trusted_certs,
2605                                caller=self.call_TerminateSegment),
2606                            args=(uri, aid), name=k,
2607                            pdata=tp, trace_file=self.trace_file)
2608                    t.start()
2609                # Wait for completions
2610                tp.wait_for_all_done()
2611
2612            # release the allocations (failed experiments have done this
2613            # already, and starting experiments may be in odd states, so we
2614            # ignore errors releasing those allocations
2615            try: 
2616                for k, (uri, aid)  in term_params.items():
2617                    self.release_access(None, aid, uri=uri,
2618                            cert_file=cert_file, cert_pwd=pw)
2619            except service_error, e:
2620                if status != 'failed' and not force:
2621                    raise e
2622
2623        # Clean up the tmpdir no matter what
2624        finally:
2625            if tmpdir: self.remove_dirs(tmpdir)
2626
2627    def terminate_experiment(self, req, fid):
2628        """
2629        Swap this experiment out on the federants and delete the shared
2630        information
2631        """
2632        tbparams = { }
2633        req = req.get('TerminateRequestBody', None)
2634        if not req:
2635            raise service_error(service_error.req,
2636                    "Bad request format (no TerminateRequestBody)")
2637
2638        key = self.get_experiment_key(req, 'experiment')
2639        proof = self.check_experiment_access(fid, key)
2640        exp = req.get('experiment', False)
2641        force = req.get('force', False)
2642
2643        dealloc_list = [ ]
2644
2645
2646        # Create a logger that logs to the dealloc_list as well as to the main
2647        # log file.
2648        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2649        h = logging.StreamHandler(self.list_log(dealloc_list))
2650        # XXX: there should be a global one of these rather than repeating the
2651        # code.
2652        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2653                    '%d %b %y %H:%M:%S'))
2654        dealloc_log.addHandler(h)
2655
2656        self.state_lock.acquire()
2657        fed_exp = self.state.get(key, None)
2658        self.state_lock.release()
2659        repo = None
2660
2661        if fed_exp:
2662            status = self.check_termination_status(fed_exp, force)
2663            # get_termination_info updates the experiment state
2664            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2665            self.deallocate_resources(term_params, expcert, status, force, 
2666                    dealloc_log)
2667
2668            # Remove the terminated experiment
2669            self.state_lock.acquire()
2670            for id in ids:
2671                self.clear_experiment_authorization(id, need_state_lock=False)
2672                if id in self.state: del self.state[id]
2673
2674            if self.state_filename: self.write_state()
2675            self.state_lock.release()
2676
2677            # Delete any synch points associated with this experiment.  All
2678            # synch points begin with the fedid of the experiment.
2679            fedid_keys = set(["fedid:%s" % f for f in ids \
2680                    if isinstance(f, fedid)])
2681            for k in self.synch_store.all_keys():
2682                try:
2683                    if len(k) > 45 and k[0:46] in fedid_keys:
2684                        self.synch_store.del_value(k)
2685                except synch_store.BadDeletionError:
2686                    pass
2687            self.write_store()
2688
2689            # Remove software and other cached stuff from the filesystem.
2690            if repo:
2691                self.remove_dirs("%s/%s" % (self.repodir, repo))
2692       
2693            return { 
2694                    'experiment': exp , 
2695                    'deallocationLog': string.join(dealloc_list, ''),
2696                    'proof': [proof.to_dict()],
2697                    }
2698        else:
2699            raise service_error(service_error.req, "No saved state")
2700
2701
2702    def GetValue(self, req, fid):
2703        """
2704        Get a value from the synchronized store
2705        """
2706        req = req.get('GetValueRequestBody', None)
2707        if not req:
2708            raise service_error(service_error.req,
2709                    "Bad request format (no GetValueRequestBody)")
2710       
2711        name = req.get('name', None)
2712        wait = req.get('wait', False)
2713        rv = { 'name': name }
2714
2715        if not name:
2716            raise service_error(service_error.req, "No name?")
2717
2718        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2719
2720        if access_ok:
2721            self.log.debug("[GetValue] asking for %s " % name)
2722            try:
2723                v = self.synch_store.get_value(name, wait)
2724            except synch_store.RevokedKeyError:
2725                # No more synch on this key
2726                raise service_error(service_error.federant, 
2727                        "Synch key %s revoked" % name)
2728            if v is not None:
2729                rv['value'] = v
2730            rv['proof'] = proof.to_dict()
2731            self.log.debug("[GetValue] got %s from %s" % (v, name))
2732            return rv
2733        else:
2734            raise service_error(service_error.access, "Access Denied",
2735                    proof=proof)
2736       
2737
2738    def SetValue(self, req, fid):
2739        """
2740        Set a value in the synchronized store
2741        """
2742        req = req.get('SetValueRequestBody', None)
2743        if not req:
2744            raise service_error(service_error.req,
2745                    "Bad request format (no SetValueRequestBody)")
2746       
2747        name = req.get('name', None)
2748        v = req.get('value', '')
2749
2750        if not name:
2751            raise service_error(service_error.req, "No name?")
2752
2753        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2754
2755        if access_ok:
2756            try:
2757                self.synch_store.set_value(name, v)
2758                self.write_store()
2759                self.log.debug("[SetValue] set %s to %s" % (name, v))
2760            except synch_store.CollisionError:
2761                # Translate into a service_error
2762                raise service_error(service_error.req,
2763                        "Value already set: %s" %name)
2764            except synch_store.RevokedKeyError:
2765                # No more synch on this key
2766                raise service_error(service_error.federant, 
2767                        "Synch key %s revoked" % name)
2768                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2769        else:
2770            raise service_error(service_error.access, "Access Denied",
2771                    proof=proof)
Note: See TracBrowser for help on using the repository browser.