source: fedd/federation/experiment_control.py @ b709861

compt_changesinfo-ops
Last change on this file since b709861 was b709861, checked in by Ted Faber <faber@…>, 12 years ago

Rebooting works

  • Property mode set to 100644
File size: 91.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39from operation_status import operation_status
40
41import topdl
42import list_log
43from ip_allocator import ip_allocator
44from ip_addr import ip_addr
45
46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
53class experiment_control_local(experiment_control_legacy):
54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
60
61    class ssh_cmd_timeout(RuntimeError): pass
62   
63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
65    call_StartSegment = service_caller('StartSegment')
66    call_TerminateSegment = service_caller('TerminateSegment')
67    call_InfoSegment = service_caller('InfoSegment')
68    call_OperationSegment = service_caller('OperationSegment')
69    call_Ns2Topdl = service_caller('Ns2Topdl')
70
71    def __init__(self, config=None, auth=None):
72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
90        self.list_log = list_log.list_log
91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
102        self.repodir = config.get("experiment_control", "repodir")
103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
105
106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
110        self.nthreads = 10
111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131        accessdb_file = config.get("experiment_control", "accessdb")
132
133        dt = config.get("experiment_control", "direct_transit")
134        self.auth_type = config.get('experiment_control', 'auth_type') \
135                or 'legacy'
136        self.auth_dir = config.get('experiment_control', 'auth_dir')
137        # XXX: document this!
138        self.info_cache_limit = \
139                config.getint('experiment_control', 'info_cache', 600)
140        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
141        else: self.direct_transit = [ ]
142        # NB for internal master/slave ops, not experiment setup
143        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
144
145        self.overrides = set([])
146        ovr = config.get('experiment_control', 'overrides')
147        if ovr:
148            for o in ovr.split(","):
149                o = o.strip()
150                if o.startswith('fedid:'): o = o[len('fedid:'):]
151                self.overrides.add(fedid(hexstr=o))
152
153        self.state = { }
154        self.state_lock = Lock()
155        self.tclsh = "/usr/local/bin/otclsh"
156        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
157                config.get("experiment_control", "tcl_splitter",
158                        "/usr/testbed/lib/ns2ir/parse.tcl")
159        mapdb_file = config.get("experiment_control", "mapdb")
160        self.trace_file = sys.stderr
161
162        self.def_expstart = \
163                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
164                "/tmp/federate";
165        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
166                "FEDDIR/hosts";
167        self.def_gwstart = \
168                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
169                "/tmp/bridge.log";
170        self.def_mgwstart = \
171                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
172                "/tmp/bridge.log";
173        self.def_gwimage = "FBSD61-TUNNEL2";
174        self.def_gwtype = "pc";
175        self.local_access = { }
176
177        if self.auth_type == 'legacy':
178            if auth:
179                self.auth = auth
180            else:
181                self.log.error( "[access]: No authorizer initialized, " +\
182                        "creating local one.")
183                auth = authorizer()
184            self.get_access = self.legacy_get_access
185        elif self.auth_type == 'abac':
186            self.auth = abac_authorizer(load=self.auth_dir)
187        else:
188            raise service_error(service_error.internal, 
189                    "Unknown auth_type: %s" % self.auth_type)
190
191        if mapdb_file:
192            self.read_mapdb(mapdb_file)
193        else:
194            self.log.warn("[experiment_control] No testbed map, using defaults")
195            self.tbmap = { 
196                    'deter':'https://users.isi.deterlab.net:23235',
197                    'emulab':'https://users.isi.deterlab.net:23236',
198                    'ucb':'https://users.isi.deterlab.net:23237',
199                    }
200
201        if accessdb_file:
202                self.read_accessdb(accessdb_file)
203        else:
204            raise service_error(service_error.internal,
205                    "No accessdb specified in config")
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
219        # Dispatch tables
220        self.soap_services = {\
221                'New': soap_handler('New', self.new_experiment),
222                'Create': soap_handler('Create', self.create_experiment),
223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
227                'Operation': soap_handler('Operation', self.do_operation),
228                'Terminate': soap_handler('Terminate', 
229                    self.terminate_experiment),
230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
232        }
233
234        self.xmlrpc_services = {\
235                'New': xmlrpc_handler('New', self.new_experiment),
236                'Create': xmlrpc_handler('Create', self.create_experiment),
237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
241                'Terminate': xmlrpc_handler('Terminate',
242                    self.terminate_experiment),
243                'Operation': xmlrpc_handler('Operation', self.do_operation),
244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
246        }
247
248    # Call while holding self.state_lock
249    def write_state(self):
250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
262        except EnvironmentError, e:
263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
269
270    @staticmethod
271    def get_alloc_ids(exp):
272        """
273        Used by read_store and read state.  This used to be worse.
274        """
275
276        return [ a.allocID for a in exp.get_all_allocations() ]
277
278
279    # Call while holding self.state_lock
280    def read_state(self):
281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
286       
287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
292        except EnvironmentError, e:
293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
299        for s in self.state.values():
300            try:
301
302                eid = s.fedid
303                if eid : 
304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
307                        #self.auth.set_attribute(s['owner'], eid)
308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
317                else:
318                    raise KeyError("No experiment id")
319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
322
323
324    def read_accessdb(self, accessdb_file):
325        """
326        Read the mapping from fedids that can create experiments to their name
327        in the 3-level access namespace.  All will be asserted from this
328        testbed and can include the local username and porject that will be
329        asserted on their behalf by this fedd.  Each fedid is also added to the
330        authorization system with the "create" attribute.
331        """
332        self.accessdb = {}
333        # These are the regexps for parsing the db
334        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
335        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
336                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
337        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
338                "\s*->\s*(" + name_expr + ")\s*$")
339        lineno = 0
340
341        # Parse the mappings and store in self.authdb, a dict of
342        # fedid -> (proj, user)
343        try:
344            f = open(accessdb_file, "r")
345            for line in f:
346                lineno += 1
347                line = line.strip()
348                if len(line) == 0 or line.startswith('#'):
349                    continue
350                m = project_line.match(line)
351                if m:
352                    fid = fedid(hexstr=m.group(1))
353                    project, user = m.group(2,3)
354                    if not self.accessdb.has_key(fid):
355                        self.accessdb[fid] = []
356                    self.accessdb[fid].append((project, user))
357                    continue
358
359                m = user_line.match(line)
360                if m:
361                    fid = fedid(hexstr=m.group(1))
362                    project = None
363                    user = m.group(2)
364                    if not self.accessdb.has_key(fid):
365                        self.accessdb[fid] = []
366                    self.accessdb[fid].append((project, user))
367                    continue
368                self.log.warn("[experiment_control] Error parsing access " +\
369                        "db %s at line %d" %  (accessdb_file, lineno))
370        except EnvironmentError:
371            raise service_error(service_error.internal,
372                    ("Error opening/reading %s as experiment " +\
373                            "control accessdb") %  accessdb_file)
374        f.close()
375
376        # Initialize the authorization attributes
377        # XXX: legacy
378        if self.auth_type == 'legacy':
379            for fid in self.accessdb.keys():
380                self.auth.set_attribute(fid, 'create')
381                self.auth.set_attribute(fid, 'new')
382
383    def read_mapdb(self, file):
384        """
385        Read a simple colon separated list of mappings for the
386        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
387        """
388
389        self.tbmap = { }
390        lineno =0
391        try:
392            f = open(file, "r")
393            for line in f:
394                lineno += 1
395                line = line.strip()
396                if line.startswith('#') or len(line) == 0:
397                    continue
398                try:
399                    label, url = line.split(':', 1)
400                    self.tbmap[label] = url
401                except ValueError, e:
402                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
403                            "map db: %s %s" % (lineno, line, e))
404        except EnvironmentError, e:
405            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
406                    "open %s: %s" % (file, e))
407        else:
408            f.close()
409
410    def read_store(self):
411        try:
412            self.synch_store = synch_store()
413            self.synch_store.load(self.store_filename)
414            self.log.debug("[read_store]: Read store from %s" % \
415                    self.store_filename)
416        except EnvironmentError, e:
417            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
418                    % (self.state_filename, e))
419            self.synch_store = synch_store()
420
421        # Set the initial permissions on data in the store.  XXX: This ad hoc
422        # authorization attribute initialization is getting out of hand.
423        # XXX: legacy
424        if self.auth_type == 'legacy':
425            for k in self.synch_store.all_keys():
426                try:
427                    if k.startswith('fedid:'):
428                        fid = fedid(hexstr=k[6:46])
429                        if self.state.has_key(fid):
430                            for a in self.get_alloc_ids(self.state[fid]):
431                                self.auth.set_attribute(a, k)
432                except ValueError, e:
433                    self.log.warn("Cannot deduce permissions for %s" % k)
434
435
436    def write_store(self):
437        """
438        Write a new copy of synch_store after writing current state
439        to a backup.  We use the internal synch_store pickle method to avoid
440        incinsistent data.
441
442        State format is a simple pickling of the store.
443        """
444        if os.access(self.store_filename, os.W_OK):
445            copy_file(self.store_filename, \
446                    "%s.bak" % self.store_filename)
447        try:
448            self.synch_store.save(self.store_filename)
449        except EnvironmentError, e:
450            self.log.error("Can't write file %s: %s" % \
451                    (self.store_filename, e))
452        except TypeError, e:
453            self.log.error("Pickling problem (TypeError): %s" % e)
454
455
456    def remove_dirs(self, dir):
457        """
458        Remove the directory tree and all files rooted at dir.  Log any errors,
459        but continue.
460        """
461        self.log.debug("[removedirs]: removing %s" % dir)
462        try:
463            for path, dirs, files in os.walk(dir, topdown=False):
464                for f in files:
465                    os.remove(os.path.join(path, f))
466                for d in dirs:
467                    os.rmdir(os.path.join(path, d))
468            os.rmdir(dir)
469        except EnvironmentError, e:
470            self.log.error("Error deleting directory tree in %s" % e);
471
472    @staticmethod
473    def make_temp_certfile(expcert, tmpdir):
474        """
475        make a protected copy of the access certificate so the experiment
476        controller can act as the experiment principal.  mkstemp is the most
477        secure way to do that. The directory should be created by
478        mkdtemp.  Return the filename.
479        """
480        if expcert and tmpdir:
481            try:
482                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
483                f = os.fdopen(certf, 'w')
484                print >> f, expcert
485                f.close()
486            except EnvironmentError, e:
487                raise service_error(service_error.internal, 
488                        "Cannot create temp cert file?")
489            return certfn
490        else:
491            return None
492
493       
494    def generate_ssh_keys(self, dest, type="rsa" ):
495        """
496        Generate a set of keys for the gateways to use to talk.
497
498        Keys are of type type and are stored in the required dest file.
499        """
500        valid_types = ("rsa", "dsa")
501        t = type.lower();
502        if t not in valid_types: raise ValueError
503        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
504
505        try:
506            trace = open("/dev/null", "w")
507        except EnvironmentError:
508            raise service_error(service_error.internal,
509                    "Cannot open /dev/null??");
510
511        # May raise CalledProcessError
512        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
513        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
514        if rv != 0:
515            raise service_error(service_error.internal, 
516                    "Cannot generate nonce ssh keys.  %s return code %d" \
517                            % (self.ssh_keygen, rv))
518
519    def generate_seer_certs(self, destdir):
520        '''
521        Create a SEER ca cert and a node cert in destdir/ca.pem and
522        destdir/node.pem respectively.  These will be distributed throughout
523        the federated experiment.  This routine reports errors via
524        service_errors.
525        '''
526        openssl = '/usr/bin/openssl'
527        # All the filenames and parameters we need for openssl calls below
528        ca_key =os.path.join(destdir, 'ca.key') 
529        ca_pem = os.path.join(destdir, 'ca.pem')
530        node_key =os.path.join(destdir, 'node.key') 
531        node_pem = os.path.join(destdir, 'node.pem')
532        node_req = os.path.join(destdir, 'node.req')
533        node_signed = os.path.join(destdir, 'node.signed')
534        days = '%s' % (365 * 10)
535        serial = '%s' % random.randint(0, 1<<16)
536
537        try:
538            # Sequence of calls to create a CA key, create a ca cert, create a
539            # node key, node signing request, and finally a signed node
540            # certificate.
541            sequence = (
542                    (openssl, 'genrsa', '-out', ca_key, '1024'),
543                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
544                        ca_pem, '-days', days, '-subj', 
545                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
546                    (openssl, 'genrsa', '-out', node_key, '1024'),
547                    (openssl, 'req', '-new', '-key', node_key, '-out', 
548                        node_req, '-days', days, '-subj', 
549                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
550                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
551                        '-set_serial', serial, '-req', '-in', node_req, 
552                        '-out', node_signed, '-days', days),
553                )
554            # Do all that stuff; bail if there's an error, and push all the
555            # output to dev/null.
556            for cmd in sequence:
557                trace = open("/dev/null", "w")
558                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
559                if rv != 0:
560                    raise service_error(service_error.internal, 
561                            "Cannot generate SEER certs.  %s return code %d" \
562                                    % (' '.join(cmd), rv))
563            # Concatinate the node key and signed certificate into node.pem
564            f = open(node_pem, 'w')
565            for comp in (node_signed, node_key):
566                g = open(comp, 'r')
567                f.write(g.read())
568                g.close()
569            f.close()
570
571            # Throw out intermediaries.
572            for fn in (ca_key, node_key, node_req, node_signed):
573                os.unlink(fn)
574
575        except EnvironmentError, e:
576            # Any difficulties with the file system wind up here
577            raise service_error(service_error.internal,
578                    "File error on  %s while creating SEER certs: %s" % \
579                            (e.filename, e.strerror))
580
581
582
583    def gentopo(self, str):
584        """
585        Generate the topology data structure from the splitter's XML
586        representation of it.
587
588        The topology XML looks like:
589            <experiment>
590                <nodes>
591                    <node><vname></vname><ips>ip1:ip2</ips></node>
592                </nodes>
593                <lans>
594                    <lan>
595                        <vname></vname><vnode></vnode><ip></ip>
596                        <bandwidth></bandwidth><member>node:port</member>
597                    </lan>
598                </lans>
599        """
600        class topo_parse:
601            """
602            Parse the topology XML and create the dats structure.
603            """
604            def __init__(self):
605                # Typing of the subelements for data conversion
606                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
607                self.int_subelements = ( 'bandwidth',)
608                self.float_subelements = ( 'delay',)
609                # The final data structure
610                self.nodes = [ ]
611                self.lans =  [ ]
612                self.topo = { \
613                        'node': self.nodes,\
614                        'lan' : self.lans,\
615                    }
616                self.element = { }  # Current element being created
617                self.chars = ""     # Last text seen
618
619            def end_element(self, name):
620                # After each sub element the contents is added to the current
621                # element or to the appropriate list.
622                if name == 'node':
623                    self.nodes.append(self.element)
624                    self.element = { }
625                elif name == 'lan':
626                    self.lans.append(self.element)
627                    self.element = { }
628                elif name in self.str_subelements:
629                    self.element[name] = self.chars
630                    self.chars = ""
631                elif name in self.int_subelements:
632                    self.element[name] = int(self.chars)
633                    self.chars = ""
634                elif name in self.float_subelements:
635                    self.element[name] = float(self.chars)
636                    self.chars = ""
637
638            def found_chars(self, data):
639                self.chars += data.rstrip()
640
641
642        tp = topo_parse();
643        parser = xml.parsers.expat.ParserCreate()
644        parser.EndElementHandler = tp.end_element
645        parser.CharacterDataHandler = tp.found_chars
646
647        parser.Parse(str)
648
649        return tp.topo
650       
651
652    def genviz(self, topo):
653        """
654        Generate the visualization the virtual topology
655        """
656
657        neato = "/usr/local/bin/neato"
658        # These are used to parse neato output and to create the visualization
659        # file.
660        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
661        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
662                "%s</type></node>"
663
664        try:
665            # Node names
666            nodes = [ n['vname'] for n in topo['node'] ]
667            topo_lans = topo['lan']
668        except KeyError, e:
669            raise service_error(service_error.internal, "Bad topology: %s" %e)
670
671        lans = { }
672        links = { }
673
674        # Walk through the virtual topology, organizing the connections into
675        # 2-node connections (links) and more-than-2-node connections (lans).
676        # When a lan is created, it's added to the list of nodes (there's a
677        # node in the visualization for the lan).
678        for l in topo_lans:
679            if links.has_key(l['vname']):
680                if len(links[l['vname']]) < 2:
681                    links[l['vname']].append(l['vnode'])
682                else:
683                    nodes.append(l['vname'])
684                    lans[l['vname']] = links[l['vname']]
685                    del links[l['vname']]
686                    lans[l['vname']].append(l['vnode'])
687            elif lans.has_key(l['vname']):
688                lans[l['vname']].append(l['vnode'])
689            else:
690                links[l['vname']] = [ l['vnode'] ]
691
692
693        # Open up a temporary file for dot to turn into a visualization
694        try:
695            df, dotname = tempfile.mkstemp()
696            dotfile = os.fdopen(df, 'w')
697        except EnvironmentError:
698            raise service_error(service_error.internal,
699                    "Failed to open file in genviz")
700
701        try:
702            dnull = open('/dev/null', 'w')
703        except EnvironmentError:
704            service_error(service_error.internal,
705                    "Failed to open /dev/null in genviz")
706
707        # Generate a dot/neato input file from the links, nodes and lans
708        try:
709            print >>dotfile, "graph G {"
710            for n in nodes:
711                print >>dotfile, '\t"%s"' % n
712            for l in links.keys():
713                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
714            for l in lans.keys():
715                for n in lans[l]:
716                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
717            print >>dotfile, "}"
718            dotfile.close()
719        except TypeError:
720            raise service_error(service_error.internal,
721                    "Single endpoint link in vtopo")
722        except EnvironmentError:
723            raise service_error(service_error.internal, "Cannot write dot file")
724
725        # Use dot to create a visualization
726        try:
727            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
728                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
729                stderr=dnull, close_fds=True)
730        except EnvironmentError:
731            raise service_error(service_error.internal, 
732                    "Cannot generate visualization: is graphviz available?")
733        dnull.close()
734
735        # Translate dot to vis format
736        vis_nodes = [ ]
737        vis = { 'node': vis_nodes }
738        for line in dot.stdout:
739            m = vis_re.match(line)
740            if m:
741                vn = m.group(1)
742                vis_node = {'name': vn, \
743                        'x': float(m.group(2)),\
744                        'y' : float(m.group(3)),\
745                    }
746                if vn in links.keys() or vn in lans.keys():
747                    vis_node['type'] = 'lan'
748                else:
749                    vis_node['type'] = 'node'
750                vis_nodes.append(vis_node)
751        rv = dot.wait()
752
753        os.remove(dotname)
754        # XXX: graphviz seems to use low return codes for warnings, like
755        # "couldn't find font"
756        if rv < 2 : return vis
757        else: return None
758
759
760    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
761            cert_pwd=None):
762        """
763        Release access to testbed through fedd
764        """
765
766        if not uri and tbmap:
767            uri = tbmap.get(tb, None)
768        if not uri:
769            raise service_error(service_error.server_config, 
770                    "Unknown testbed: %s" % tb)
771
772        if self.local_access.has_key(uri):
773            resp = self.local_access[uri].ReleaseAccess(\
774                    { 'ReleaseAccessRequestBody' : 
775                        {'allocID': {'fedid': aid}},}, 
776                    fedid(file=cert_file))
777            resp = { 'ReleaseAccessResponseBody': resp } 
778        else:
779            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
780                    cert_file, cert_pwd, self.trusted_certs)
781
782        # better error coding
783
784    def remote_ns2topdl(self, uri, desc):
785
786        req = {
787                'description' : { 'ns2description': desc },
788            }
789
790        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
791                self.trusted_certs)
792
793        if r.has_key('Ns2TopdlResponseBody'):
794            r = r['Ns2TopdlResponseBody']
795            ed = r.get('experimentdescription', None)
796            if ed.has_key('topdldescription'):
797                return topdl.Topology(**ed['topdldescription'])
798            else:
799                raise service_error(service_error.protocol, 
800                        "Bad splitter response (no output)")
801        else:
802            raise service_error(service_error.protocol, "Bad splitter response")
803
804    class start_segment:
805        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
806                cert_pwd=None, trusted_certs=None, caller=None,
807                log_collector=None):
808            self.log = log
809            self.debug = debug
810            self.cert_file = cert_file
811            self.cert_pwd = cert_pwd
812            self.trusted_certs = None
813            self.caller = caller
814            self.testbed = testbed
815            self.log_collector = log_collector
816            self.response = None
817            self.node = { }
818            self.proof = None
819
820        def make_map(self, resp):
821            if 'segmentdescription' not in resp  or \
822                    'topdldescription' not in resp['segmentdescription']:
823                self.log.warn('No topology returned from startsegment')
824                return 
825
826            top = topdl.Topology(
827                    **resp['segmentdescription']['topdldescription'])
828
829            for e in top.elements:
830                if isinstance(e, topdl.Computer):
831                    self.node[e.name] = \
832                            (e.localname, e.status, e.service, e.operation)
833
834        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
835            req = {
836                    'allocID': { 'fedid' : aid }, 
837                    'segmentdescription': { 
838                        'topdldescription': topo.to_dict(),
839                    },
840                }
841
842            if connInfo:
843                req['connection'] = connInfo
844
845            import_svcs = [ s for m in masters.values() \
846                    for s in m if self.testbed in s.importers]
847
848            if import_svcs or self.testbed in masters:
849                req['service'] = []
850
851            for s in import_svcs:
852                for r in s.reqs:
853                    sr = copy.deepcopy(r)
854                    sr['visibility'] = 'import';
855                    req['service'].append(sr)
856
857            for s in masters.get(self.testbed, []):
858                for r in s.reqs:
859                    sr = copy.deepcopy(r)
860                    sr['visibility'] = 'export';
861                    req['service'].append(sr)
862
863            if attrs:
864                req['fedAttr'] = attrs
865
866            try:
867                self.log.debug("Calling StartSegment at %s " % uri)
868                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
869                        self.trusted_certs)
870                if r.has_key('StartSegmentResponseBody'):
871                    lval = r['StartSegmentResponseBody'].get('allocationLog',
872                            None)
873                    if lval and self.log_collector:
874                        for line in  lval.splitlines(True):
875                            self.log_collector.write(line)
876                    self.make_map(r['StartSegmentResponseBody'])
877                    if 'proof' in r: self.proof = r['proof']
878                    self.response = r
879                else:
880                    raise service_error(service_error.internal, 
881                            "Bad response!?: %s" %r)
882                return True
883            except service_error, e:
884                self.log.error("Start segment failed on %s: %s" % \
885                        (self.testbed, e))
886                return False
887
888
889
890    class terminate_segment:
891        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
892                cert_pwd=None, trusted_certs=None, caller=None):
893            self.log = log
894            self.debug = debug
895            self.cert_file = cert_file
896            self.cert_pwd = cert_pwd
897            self.trusted_certs = None
898            self.caller = caller
899            self.testbed = testbed
900
901        def __call__(self, uri, aid ):
902            req = {
903                    'allocID': {'fedid': aid }, 
904                }
905            try:
906                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
907                        self.trusted_certs)
908                return True
909            except service_error, e:
910                self.log.error("Terminate segment failed on %s: %s" % \
911                        (self.testbed, e))
912                return False
913
914    class info_segment(start_segment):
915        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
916                cert_pwd=None, trusted_certs=None, caller=None,
917                log_collector=None):
918            experiment_control_local.start_segment.__init__(self, debug, 
919                    log, testbed, cert_file, cert_pwd, trusted_certs, 
920                    caller, log_collector)
921
922        def __call__(self, uri, aid):
923            req = { 'allocID': { 'fedid' : aid } }
924
925            try:
926                self.log.debug("Calling InfoSegment at %s " % uri)
927                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
928                        self.trusted_certs)
929                if r.has_key('InfoSegmentResponseBody'):
930                    self.make_map(r['InfoSegmentResponseBody'])
931                    if 'proof' in r: self.proof = r['proof']
932                    self.response = r
933                else:
934                    raise service_error(service_error.internal, 
935                            "Bad response!?: %s" %r)
936                return True
937            except service_error, e:
938                self.log.error("Info segment failed on %s: %s" % \
939                        (self.testbed, e))
940                return False
941
942    class operation_segment:
943        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
944                cert_pwd=None, trusted_certs=None, caller=None,
945                log_collector=None):
946            self.log = log
947            self.debug = debug
948            self.cert_file = cert_file
949            self.cert_pwd = cert_pwd
950            self.trusted_certs = None
951            self.caller = caller
952            self.testbed = testbed
953            self.status = None
954
955        def __call__(self, uri, aid, op, targets, params):
956            req = { 
957                    'allocID': { 'fedid' : aid },
958                    'operation': op,
959                    'target': targets,
960                    }
961            if params: req['parameter'] = params
962
963
964            try:
965                self.log.debug("Calling OperationSegment at %s " % uri)
966                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
967                        self.trusted_certs)
968                if 'OperationSegmentResponseBody' in r:
969                    r = r['OperationSegmentResponseBody']
970                    if 'status' in r:
971                        self.status = r['status']
972                else:
973                    raise service_error(service_error.internal, 
974                            "Bad response!?: %s" %r)
975                return True
976            except service_error, e:
977                self.log.error("Operation segment failed on %s: %s" % \
978                        (self.testbed, e))
979                return False
980
981    def annotate_topology(self, top, data):
982        def add_new(ann, attr):
983            for a in ann:
984                if a not in attr: attr.append(a)
985
986        # Annotate the topology with embedding info
987        for e in top.elements:
988            if isinstance(e, topdl.Computer):
989                for s in data:
990                    ann = s.node.get(e.name, None)
991                    if ann is not None:
992                        add_new(ann[0], e.localname)
993                        e.status = ann[1]
994                        add_new(ann[2], e.service)
995                        add_new(ann[3], e.operation)
996                        break
997
998
999
1000    def allocate_resources(self, allocated, masters, eid, expid, 
1001            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1002            attrs=None, connInfo={}, tbmap=None, expcert=None):
1003
1004        started = { }           # Testbeds where a sub-experiment started
1005                                # successfully
1006
1007        # XXX
1008        fail_soft = False
1009
1010        if tbmap is None: tbmap = { }
1011
1012        log = alloc_log or self.log
1013
1014        tp = thread_pool(self.nthreads)
1015        threads = [ ]
1016        starters = [ ]
1017
1018        if expcert:
1019            cert = expcert
1020            pw = None
1021        else:
1022            cert = self.cert_file
1023            pw = self.cert_pwd
1024
1025        for tb in allocated.keys():
1026            # Create and start a thread to start the segment, and save it
1027            # to get the return value later
1028            tb_attrs = copy.copy(attrs)
1029            tp.wait_for_slot()
1030            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1031            base, suffix = split_testbed(tb)
1032            if suffix:
1033                tb_attrs.append({'attribute': 'experiment_name', 
1034                    'value': "%s-%s" % (eid, suffix)})
1035            else:
1036                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1037            if not uri:
1038                raise service_error(service_error.internal, 
1039                        "Unknown testbed %s !?" % tb)
1040
1041            aid = tbparams[tb].allocID
1042            if not aid:
1043                raise service_error(service_error.internal, 
1044                        "No alloc id for testbed %s !?" % tb)
1045
1046            s = self.start_segment(log=log, debug=self.debug,
1047                    testbed=tb, cert_file=cert,
1048                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1049                    caller=self.call_StartSegment,
1050                    log_collector=log_collector)
1051            starters.append(s)
1052            t  = pooled_thread(\
1053                    target=s, name=tb,
1054                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1055                    pdata=tp, trace_file=self.trace_file)
1056            threads.append(t)
1057            t.start()
1058
1059        # Wait until all finish (keep pinging the log, though)
1060        mins = 0
1061        revoked = False
1062        while not tp.wait_for_all_done(60.0):
1063            mins += 1
1064            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1065                    % mins)
1066            if not revoked and \
1067                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1068                # a testbed has failed.  Revoke this experiment's
1069                # synchronizarion values so that sub experiments will not
1070                # deadlock waiting for synchronization that will never happen
1071                self.log.info("A subexperiment has failed to swap in, " + \
1072                        "revoking synch keys")
1073                var_key = "fedid:%s" % expid
1074                for k in self.synch_store.all_keys():
1075                    if len(k) > 45 and k[0:46] == var_key:
1076                        self.synch_store.revoke_key(k)
1077                revoked = True
1078
1079        failed = [ t.getName() for t in threads if not t.rv ]
1080        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1081
1082        # If one failed clean up, unless fail_soft is set
1083        if failed:
1084            if not fail_soft:
1085                tp.clear()
1086                for tb in succeeded:
1087                    # Create and start a thread to stop the segment
1088                    tp.wait_for_slot()
1089                    uri = tbparams[tb].uri
1090                    t  = pooled_thread(\
1091                            target=self.terminate_segment(log=log,
1092                                testbed=tb,
1093                                cert_file=cert, 
1094                                cert_pwd=pw,
1095                                trusted_certs=self.trusted_certs,
1096                                caller=self.call_TerminateSegment),
1097                            args=(uri, tbparams[tb].allocID),
1098                            name=tb,
1099                            pdata=tp, trace_file=self.trace_file)
1100                    t.start()
1101                # Wait until all finish (if any are being stopped)
1102                if succeeded:
1103                    tp.wait_for_all_done()
1104
1105                # release the allocations
1106                for tb in tbparams.keys():
1107                    try:
1108                        self.release_access(tb, tbparams[tb].allocID, 
1109                                tbmap=tbmap, uri=tbparams[tb].uri,
1110                                cert_file=cert, cert_pwd=pw)
1111                    except service_error, e:
1112                        self.log.warn("Error releasing access: %s" % e.desc)
1113                # Remove the placeholder
1114                self.state_lock.acquire()
1115                self.state[eid].status = 'failed'
1116                self.state[eid].updated()
1117                if self.state_filename: self.write_state()
1118                self.state_lock.release()
1119                # Remove the repo dir
1120                self.remove_dirs("%s/%s" %(self.repodir, expid))
1121                # Walk up tmpdir, deleting as we go
1122                if self.cleanup:
1123                    self.remove_dirs(tmpdir)
1124                else:
1125                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1126
1127
1128                log.error("Swap in failed on %s" % ",".join(failed))
1129                return
1130        else:
1131            # Walk through the successes and gather the proofs
1132            proofs = { }
1133            for s in starters:
1134                if s.proof:
1135                    proofs[s.testbed] = s.proof
1136            self.annotate_topology(top, starters)
1137            log.info("[start_segment]: Experiment %s active" % eid)
1138
1139
1140        # Walk up tmpdir, deleting as we go
1141        if self.cleanup:
1142            self.remove_dirs(tmpdir)
1143        else:
1144            log.debug("[start_experiment]: not removing %s" % tmpdir)
1145
1146        # Insert the experiment into our state and update the disk copy.
1147        self.state_lock.acquire()
1148        self.state[expid].status = 'active'
1149        self.state[eid] = self.state[expid]
1150        self.state[eid].top = top
1151        self.state[eid].updated()
1152        # Append startup proofs
1153        for f in self.state[eid].get_all_allocations():
1154            if f.tb in proofs:
1155                f.proof.append(proofs[f.tb])
1156
1157        if self.state_filename: self.write_state()
1158        self.state_lock.release()
1159        return
1160
1161
1162    def add_kit(self, e, kit):
1163        """
1164        Add a Software object created from the list of (install, location)
1165        tuples passed as kit  to the software attribute of an object e.  We
1166        do this enough to break out the code, but it's kind of a hack to
1167        avoid changing the old tuple rep.
1168        """
1169
1170        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1171
1172        if isinstance(e.software, list): e.software.extend(s)
1173        else: e.software = s
1174
1175    def append_experiment_authorization(self, expid, attrs, 
1176            need_state_lock=True):
1177        """
1178        Append the authorization information to system state
1179        """
1180
1181        for p, a in attrs:
1182            self.auth.set_attribute(p, a)
1183        self.auth.save()
1184
1185        if need_state_lock: self.state_lock.acquire()
1186        # XXX: really a no op?
1187        #self.state[expid]['auth'].update(attrs)
1188        if self.state_filename: self.write_state()
1189        if need_state_lock: self.state_lock.release()
1190
1191    def clear_experiment_authorization(self, expid, need_state_lock=True):
1192        """
1193        Attrs is a set of attribute principal pairs that need to be removed
1194        from the authenticator.  Remove them and save the authenticator.
1195        """
1196
1197        if need_state_lock: self.state_lock.acquire()
1198        # XXX: should be a no-op
1199        #if expid in self.state and 'auth' in self.state[expid]:
1200            #for p, a in self.state[expid]['auth']:
1201                #self.auth.unset_attribute(p, a)
1202            #self.state[expid]['auth'] = set()
1203        if self.state_filename: self.write_state()
1204        if need_state_lock: self.state_lock.release()
1205        self.auth.save()
1206
1207
1208    def create_experiment_state(self, fid, req, expid, expcert,
1209            state='starting'):
1210        """
1211        Create the initial entry in the experiment's state.  The expid and
1212        expcert are the experiment's fedid and certifacte that represents that
1213        ID, which are installed in the experiment state.  If the request
1214        includes a suggested local name that is used if possible.  If the local
1215        name is already taken by an experiment owned by this user that has
1216        failed, it is overwritten.  Otherwise new letters are added until a
1217        valid localname is found.  The generated local name is returned.
1218        """
1219
1220        if req.has_key('experimentID') and \
1221                req['experimentID'].has_key('localname'):
1222            overwrite = False
1223            eid = req['experimentID']['localname']
1224            # If there's an old failed experiment here with the same local name
1225            # and accessible by this user, we'll overwrite it, otherwise we'll
1226            # fall through and do the collision avoidance.
1227            old_expid = self.get_experiment_fedid(eid)
1228            if old_expid:
1229                users_experiment = True
1230                try:
1231                    self.check_experiment_access(fid, old_expid)
1232                except service_error, e:
1233                    if e.code == service_error.access: users_experiment = False
1234                    else: raise e
1235                if users_experiment:
1236                    self.state_lock.acquire()
1237                    status = self.state[eid].status
1238                    if status and status == 'failed':
1239                        # remove the old access attributes
1240                        self.clear_experiment_authorization(eid,
1241                                need_state_lock=False)
1242                        overwrite = True
1243                        del self.state[eid]
1244                        del self.state[old_expid]
1245                    self.state_lock.release()
1246                else:
1247                    self.log.info('Experiment %s exists, ' % eid + \
1248                            'but this user cannot access it')
1249            self.state_lock.acquire()
1250            while (self.state.has_key(eid) and not overwrite):
1251                eid += random.choice(string.ascii_letters)
1252            # Initial state
1253            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1254                    identity=expcert)
1255            self.state[expid] = self.state[eid]
1256            if self.state_filename: self.write_state()
1257            self.state_lock.release()
1258        else:
1259            eid = self.exp_stem
1260            for i in range(0,5):
1261                eid += random.choice(string.ascii_letters)
1262            self.state_lock.acquire()
1263            while (self.state.has_key(eid)):
1264                eid = self.exp_stem
1265                for i in range(0,5):
1266                    eid += random.choice(string.ascii_letters)
1267            # Initial state
1268            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1269                    identity=expcert)
1270            self.state[expid] = self.state[eid]
1271            if self.state_filename: self.write_state()
1272            self.state_lock.release()
1273
1274        # Let users touch the state.  Authorize this fid and the expid itself
1275        # to touch the experiment, as well as allowing th eoverrides.
1276        self.append_experiment_authorization(eid, 
1277                set([(fid, expid), (expid,expid)] + \
1278                        [ (o, expid) for o in self.overrides]))
1279
1280        return eid
1281
1282
1283    def allocate_ips_to_topo(self, top):
1284        """
1285        Add an ip4_address attribute to all the hosts in the topology, based on
1286        the shared substrates on which they sit.  An /etc/hosts file is also
1287        created and returned as a list of hostfiles entries.  We also return
1288        the allocator, because we may need to allocate IPs to portals
1289        (specifically DRAGON portals).
1290        """
1291        subs = sorted(top.substrates, 
1292                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1293                reverse=True)
1294        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1295        ifs = { }
1296        hosts = [ ]
1297
1298        for idx, s in enumerate(subs):
1299            net_size = len(s.interfaces)+2
1300
1301            a = ips.allocate(net_size)
1302            if a :
1303                base, num = a
1304                if num < net_size: 
1305                    raise service_error(service_error.internal,
1306                            "Allocator returned wrong number of IPs??")
1307            else:
1308                raise service_error(service_error.req, 
1309                        "Cannot allocate IP addresses")
1310            mask = ips.min_alloc
1311            while mask < net_size:
1312                mask *= 2
1313
1314            netmask = ((2**32-1) ^ (mask-1))
1315
1316            base += 1
1317            for i in s.interfaces:
1318                i.attribute.append(
1319                        topdl.Attribute('ip4_address', 
1320                            "%s" % ip_addr(base)))
1321                i.attribute.append(
1322                        topdl.Attribute('ip4_netmask', 
1323                            "%s" % ip_addr(int(netmask))))
1324
1325                hname = i.element.name
1326                if ifs.has_key(hname):
1327                    hosts.append("%s\t%s-%s %s-%d" % \
1328                            (ip_addr(base), hname, s.name, hname,
1329                                ifs[hname]))
1330                else:
1331                    ifs[hname] = 0
1332                    hosts.append("%s\t%s-%s %s-%d %s" % \
1333                            (ip_addr(base), hname, s.name, hname,
1334                                ifs[hname], hname))
1335
1336                ifs[hname] += 1
1337                base += 1
1338        return hosts, ips
1339
1340    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1341            tbparam, masters, tbmap, expid=None, expcert=None):
1342        for tb in testbeds:
1343            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1344                    expcert)
1345            allocated[tb] = 1
1346
1347    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1348            expcert=None):
1349        """
1350        Get access to testbed through fedd and set the parameters for that tb
1351        """
1352        def get_export_project(svcs):
1353            """
1354            Look through for the list of federated_service for this testbed
1355            objects for a project_export service, and extract the project
1356            parameter.
1357            """
1358
1359            pe = [s for s in svcs if s.name=='project_export']
1360            if len(pe) == 1:
1361                return pe[0].params.get('project', None)
1362            elif len(pe) == 0:
1363                return None
1364            else:
1365                raise service_error(service_error.req,
1366                        "More than one project export is not supported")
1367
1368        def add_services(svcs, type, slist, keys):
1369            """
1370            Add the given services to slist.  type is import or export.  Also
1371            add a mapping entry from the assigned id to the original service
1372            record.
1373            """
1374            for i, s in enumerate(svcs):
1375                idx = '%s%d' % (type, i)
1376                keys[idx] = s
1377                sr = {'id': idx, 'name': s.name, 'visibility': type }
1378                if s.params:
1379                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1380                            for k, v in s.params.items()]
1381                slist.append(sr)
1382
1383        uri = tbmap.get(testbed_base(tb), None)
1384        if not uri:
1385            raise service_error(service_error.server_config, 
1386                    "Unknown testbed: %s" % tb)
1387
1388        export_svcs = masters.get(tb,[])
1389        import_svcs = [ s for m in masters.values() \
1390                for s in m \
1391                    if tb in s.importers ]
1392
1393        export_project = get_export_project(export_svcs)
1394        # Compose the credential list so that IDs come before attributes
1395        creds = set()
1396        keys = set()
1397        certs = self.auth.get_creds_for_principal(fid)
1398        if expid:
1399            certs.update(self.auth.get_creds_for_principal(expid))
1400        for c in certs:
1401            keys.add(c.issuer_cert())
1402            creds.add(c.attribute_cert())
1403        creds = list(keys) + list(creds)
1404
1405        if expcert: cert, pw = expcert, None
1406        else: cert, pw = self.cert_file, self.cert_pw
1407
1408        # Request credentials
1409        req = {
1410                'abac_credential': creds,
1411            }
1412        # Make the service request from the services we're importing and
1413        # exporting.  Keep track of the export request ids so we can
1414        # collect the resulting info from the access response.
1415        e_keys = { }
1416        if import_svcs or export_svcs:
1417            slist = []
1418            add_services(import_svcs, 'import', slist, e_keys)
1419            add_services(export_svcs, 'export', slist, e_keys)
1420            req['service'] = slist
1421
1422        if self.local_access.has_key(uri):
1423            # Local access call
1424            req = { 'RequestAccessRequestBody' : req }
1425            r = self.local_access[uri].RequestAccess(req, 
1426                    fedid(file=self.cert_file))
1427            r = { 'RequestAccessResponseBody' : r }
1428        else:
1429            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1430
1431        if r.has_key('RequestAccessResponseBody'):
1432            # Through to here we have a valid response, not a fault.
1433            # Access denied is a fault, so something better or worse than
1434            # access denied has happened.
1435            r = r['RequestAccessResponseBody']
1436            self.log.debug("[get_access] Access granted")
1437        else:
1438            raise service_error(service_error.protocol,
1439                        "Bad proxy response")
1440        if 'proof' not in r:
1441            raise service_error(service_error.protocol,
1442                        "Bad access response (no access proof)")
1443
1444        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1445                tb=tb, uri=uri, proof=[r['proof']], 
1446                services=masters.get(tb, None))
1447
1448        # Collect the responses corresponding to the services this testbed
1449        # exports.  These will be the service requests that we will include in
1450        # the start segment requests (with appropriate visibility values) to
1451        # import and export the segments.
1452        for s in r.get('service', []):
1453            id = s.get('id', None)
1454            # Note that this attaches the response to the object in the masters
1455            # data structure.  (The e_keys index disappears when this fcn
1456            # returns)
1457            if id and id in e_keys:
1458                e_keys[id].reqs.append(s)
1459
1460        # Add attributes to parameter space.  We don't allow attributes to
1461        # overlay any parameters already installed.
1462        for a in r.get('fedAttr', []):
1463            try:
1464                if a['attribute']:
1465                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1466            except KeyError:
1467                self.log.error("Bad attribute in response: %s" % a)
1468
1469
1470    def split_topology(self, top, topo, testbeds):
1471        """
1472        Create the sub-topologies that are needed for experiment instantiation.
1473        """
1474        for tb in testbeds:
1475            topo[tb] = top.clone()
1476            # copy in for loop allows deletions from the original
1477            for e in [ e for e in topo[tb].elements]:
1478                etb = e.get_attribute('testbed')
1479                # NB: elements without a testbed attribute won't appear in any
1480                # sub topologies. 
1481                if not etb or etb != tb:
1482                    for i in e.interface:
1483                        for s in i.subs:
1484                            try:
1485                                s.interfaces.remove(i)
1486                            except ValueError:
1487                                raise service_error(service_error.internal,
1488                                        "Can't remove interface??")
1489                    topo[tb].elements.remove(e)
1490            topo[tb].make_indices()
1491
1492    def confirm_software(self, top):
1493        """
1494        Make sure that the software to be loaded in the topo is all available
1495        before we begin making access requests, etc.  This is a subset of
1496        wrangle_software.
1497        """
1498        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1499        pkgs.update([x.location for e in top.elements for x in e.software])
1500
1501        for pkg in pkgs:
1502            loc = pkg
1503
1504            scheme, host, path = urlparse(loc)[0:3]
1505            dest = os.path.basename(path)
1506            if not scheme:
1507                if not loc.startswith('/'):
1508                    loc = "/%s" % loc
1509                loc = "file://%s" %loc
1510            # NB: if scheme was found, loc == pkg
1511            try:
1512                u = urlopen(loc)
1513                u.close()
1514            except Exception, e:
1515                raise service_error(service_error.req, 
1516                        "Cannot open %s: %s" % (loc, e))
1517        return True
1518
1519    def wrangle_software(self, expid, top, topo, tbparams):
1520        """
1521        Copy software out to the repository directory, allocate permissions and
1522        rewrite the segment topologies to look for the software in local
1523        places.
1524        """
1525
1526        # Copy the rpms and tarfiles to a distribution directory from
1527        # which the federants can retrieve them
1528        linkpath = "%s/software" %  expid
1529        softdir ="%s/%s" % ( self.repodir, linkpath)
1530        softmap = { }
1531
1532        # self.fedkit and self.gateway kit are lists of tuples of
1533        # (install_location, download_location) this extracts the download
1534        # locations.
1535        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1536        pkgs.update([x.location for e in top.elements for x in e.software])
1537        try:
1538            os.makedirs(softdir)
1539        except EnvironmentError, e:
1540            raise service_error(
1541                    "Cannot create software directory: %s" % e)
1542        # The actual copying.  Everything's converted into a url for copying.
1543        auth_attrs = set()
1544        for pkg in pkgs:
1545            loc = pkg
1546
1547            scheme, host, path = urlparse(loc)[0:3]
1548            dest = os.path.basename(path)
1549            if not scheme:
1550                if not loc.startswith('/'):
1551                    loc = "/%s" % loc
1552                loc = "file://%s" %loc
1553            # NB: if scheme was found, loc == pkg
1554            try:
1555                u = urlopen(loc)
1556            except Exception, e:
1557                raise service_error(service_error.req, 
1558                        "Cannot open %s: %s" % (loc, e))
1559            try:
1560                f = open("%s/%s" % (softdir, dest) , "w")
1561                self.log.debug("Writing %s/%s" % (softdir,dest) )
1562                data = u.read(4096)
1563                while data:
1564                    f.write(data)
1565                    data = u.read(4096)
1566                f.close()
1567                u.close()
1568            except Exception, e:
1569                raise service_error(service_error.internal,
1570                        "Could not copy %s: %s" % (loc, e))
1571            path = re.sub("/tmp", "", linkpath)
1572            # XXX
1573            softmap[pkg] = \
1574                    "%s/%s/%s" %\
1575                    ( self.repo_url, path, dest)
1576
1577            # Allow the individual segments to access the software by assigning
1578            # an attribute to each testbed allocation that encodes the data to
1579            # be released.  This expression collects the data for each run of
1580            # the loop.
1581            auth_attrs.update([
1582                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1583                        for tb in tbparams.keys()])
1584
1585        self.append_experiment_authorization(expid, auth_attrs)
1586
1587        # Convert the software locations in the segments into the local
1588        # copies on this host
1589        for soft in [ s for tb in topo.values() \
1590                for e in tb.elements \
1591                    if getattr(e, 'software', False) \
1592                        for s in e.software ]:
1593            if softmap.has_key(soft.location):
1594                soft.location = softmap[soft.location]
1595
1596
1597    def new_experiment(self, req, fid):
1598        """
1599        The external interface to empty initial experiment creation called from
1600        the dispatcher.
1601
1602        Creates a working directory, splits the incoming description using the
1603        splitter script and parses out the avrious subsections using the
1604        lcasses above.  Once each sub-experiment is created, use pooled threads
1605        to instantiate them and start it all up.
1606        """
1607        req = req.get('NewRequestBody', None)
1608        if not req:
1609            raise service_error(service_error.req,
1610                    "Bad request format (no NewRequestBody)")
1611
1612        if self.auth.import_credentials(data_list=req.get('credential', [])):
1613            self.auth.save()
1614
1615        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1616                with_proof=True)
1617
1618        if not access_ok:
1619            raise service_error(service_error.access, "New access denied",
1620                    proof=[proof])
1621
1622        try:
1623            tmpdir = tempfile.mkdtemp(prefix="split-")
1624        except EnvironmentError:
1625            raise service_error(service_error.internal, "Cannot create tmp dir")
1626
1627        try:
1628            access_user = self.accessdb[fid]
1629        except KeyError:
1630            raise service_error(service_error.internal,
1631                    "Access map and authorizer out of sync in " + \
1632                            "new_experiment for fedid %s"  % fid)
1633
1634        # Generate an ID for the experiment (slice) and a certificate that the
1635        # allocator can use to prove they own it.  We'll ship it back through
1636        # the encrypted connection.  If the requester supplied one, use it.
1637        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1638            expcert = req['experimentAccess']['X509']
1639            expid = fedid(certstr=expcert)
1640            self.state_lock.acquire()
1641            if expid in self.state:
1642                self.state_lock.release()
1643                raise service_error(service_error.req, 
1644                        'fedid %s identifies an existing experiment' % expid)
1645            self.state_lock.release()
1646        else:
1647            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1648
1649        #now we're done with the tmpdir, and it should be empty
1650        if self.cleanup:
1651            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1652            os.rmdir(tmpdir)
1653        else:
1654            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1655
1656        eid = self.create_experiment_state(fid, req, expid, expcert, 
1657                state='empty')
1658
1659        rv = {
1660                'experimentID': [
1661                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1662                ],
1663                'experimentStatus': 'empty',
1664                'experimentAccess': { 'X509' : expcert },
1665                'proof': proof.to_dict(),
1666            }
1667
1668        return rv
1669
1670    # create_experiment sub-functions
1671
1672    @staticmethod
1673    def get_experiment_key(req, field='experimentID'):
1674        """
1675        Parse the experiment identifiers out of the request (the request body
1676        tag has been removed).  Specifically this pulls either the fedid or the
1677        localname out of the experimentID field.  A fedid is preferred.  If
1678        neither is present or the request does not contain the fields,
1679        service_errors are raised.
1680        """
1681        # Get the experiment access
1682        exp = req.get(field, None)
1683        if exp:
1684            if exp.has_key('fedid'):
1685                key = exp['fedid']
1686            elif exp.has_key('localname'):
1687                key = exp['localname']
1688            else:
1689                raise service_error(service_error.req, "Unknown lookup type")
1690        else:
1691            raise service_error(service_error.req, "No request?")
1692
1693        return key
1694
1695    def get_experiment_ids_and_start(self, key, tmpdir):
1696        """
1697        Get the experiment name, id and access certificate from the state, and
1698        set the experiment state to 'starting'.  returns a triple (fedid,
1699        localname, access_cert_file). The access_cert_file is a copy of the
1700        contents of the access certificate, created in the tempdir with
1701        restricted permissions.  If things are confused, raise an exception.
1702        """
1703
1704        expid = eid = None
1705        self.state_lock.acquire()
1706        if key in self.state:
1707            exp = self.state[key]
1708            exp.status = "starting"
1709            exp.updated()
1710            expid = exp.fedid
1711            eid = exp.localname
1712            expcert = exp.identity
1713        self.state_lock.release()
1714
1715        # make a protected copy of the access certificate so the experiment
1716        # controller can act as the experiment principal.
1717        if expcert:
1718            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1719            if not expcert_file:
1720                raise service_error(service_error.internal, 
1721                        "Cannot create temp cert file?")
1722        else:
1723            expcert_file = None
1724
1725        return (eid, expid, expcert_file)
1726
1727    def get_topology(self, req, tmpdir):
1728        """
1729        Get the ns2 content and put it into a file for parsing.  Call the local
1730        or remote parser and return the topdl.Topology.  Errors result in
1731        exceptions.  req is the request and tmpdir is a work directory.
1732        """
1733
1734        # The tcl parser needs to read a file so put the content into that file
1735        descr=req.get('experimentdescription', None)
1736        if descr:
1737            if 'ns2description' in descr:
1738                file_content=descr['ns2description']
1739            elif 'topdldescription' in descr:
1740                return topdl.Topology(**descr['topdldescription'])
1741            else:
1742                raise service_error(service_error.req, 
1743                        'Unknown experiment description type')
1744        else:
1745            raise service_error(service_error.req, "No experiment description")
1746
1747
1748        if self.splitter_url:
1749            self.log.debug("Calling remote topdl translator at %s" % \
1750                    self.splitter_url)
1751            top = self.remote_ns2topdl(self.splitter_url, file_content)
1752        else:
1753            tclfile = os.path.join(tmpdir, "experiment.tcl")
1754            if file_content:
1755                try:
1756                    f = open(tclfile, 'w')
1757                    f.write(file_content)
1758                    f.close()
1759                except EnvironmentError:
1760                    raise service_error(service_error.internal,
1761                            "Cannot write temp experiment description")
1762            else:
1763                raise service_error(service_error.req, 
1764                        "Only ns2descriptions supported")
1765            pid = "dummy"
1766            gid = "dummy"
1767            eid = "dummy"
1768
1769            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1770                str(self.muxmax), '-m', 'dummy']
1771
1772            tclcmd.extend([pid, gid, eid, tclfile])
1773
1774            self.log.debug("running local splitter %s", " ".join(tclcmd))
1775            # This is just fantastic.  As a side effect the parser copies
1776            # tb_compat.tcl into the current directory, so that directory
1777            # must be writable by the fedd user.  Doing this in the
1778            # temporary subdir ensures this is the case.
1779            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1780                    cwd=tmpdir)
1781            split_data = tclparser.stdout
1782
1783            top = topdl.topology_from_xml(file=split_data, top="experiment")
1784            os.remove(tclfile)
1785
1786        return top
1787
1788    def get_testbed_services(self, req, testbeds):
1789        """
1790        Parse the services section of the request into two dicts mapping
1791        testbed to lists of federated_service objects.  The first dict maps all
1792        exporters of services to those service objects, the second maps
1793        testbeds to service objects only for services requiring portals.
1794        """
1795        # We construct both dicts here because deriving the second is more
1796        # comples than it looks - both the keys and lists can differ, and it's
1797        # much easier to generate both in one pass.
1798        masters = { }
1799        pmasters = { }
1800        for s in req.get('service', []):
1801            # If this is a service request with the importall field
1802            # set, fill it out.
1803
1804            if s.get('importall', False):
1805                s['import'] = [ tb for tb in testbeds \
1806                        if tb not in s.get('export',[])]
1807                del s['importall']
1808
1809            # Add the service to masters
1810            for tb in s.get('export', []):
1811                if s.get('name', None):
1812
1813                    params = { }
1814                    for a in s.get('fedAttr', []):
1815                        params[a.get('attribute', '')] = a.get('value','')
1816
1817                    fser = federated_service(name=s['name'],
1818                            exporter=tb, importers=s.get('import',[]),
1819                            params=params)
1820                    if fser.name == 'hide_hosts' \
1821                            and 'hosts' not in fser.params:
1822                        fser.params['hosts'] = \
1823                                ",".join(tb_hosts.get(fser.exporter, []))
1824                    if tb in masters: masters[tb].append(fser)
1825                    else: masters[tb] = [fser]
1826
1827                    if fser.portal:
1828                        if tb in pmasters: pmasters[tb].append(fser)
1829                        else: pmasters[tb] = [fser]
1830                else:
1831                    self.log.error('Testbed service does not have name " + \
1832                            "and importers')
1833        return masters, pmasters
1834
1835    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1836        """
1837        Create the ssh keys necessary for interconnecting the portal nodes and
1838        the global hosts file for letting each segment know about the IP
1839        addresses in play.  Save these into the repo.  Add attributes to the
1840        autorizer allowing access controllers to download them and return a set
1841        of attributes that inform the segments where to find this stuff.  May
1842        raise service_errors in if there are problems.
1843        """
1844        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1845        gw_secretkey_base = "fed.%s" % self.ssh_type
1846        keydir = os.path.join(tmpdir, 'keys')
1847        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1848        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1849
1850        try:
1851            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1852        except ValueError:
1853            raise service_error(service_error.server_config, 
1854                    "Bad key type (%s)" % self.ssh_type)
1855
1856        self.generate_seer_certs(keydir)
1857
1858        # Copy configuration files into the remote file store
1859        # The config urlpath
1860        configpath = "/%s/config" % expid
1861        # The config file system location
1862        configdir ="%s%s" % ( self.repodir, configpath)
1863        try:
1864            os.makedirs(configdir)
1865        except EnvironmentError, e:
1866            raise service_error(service_error.internal,
1867                    "Cannot create config directory: %s" % e)
1868        try:
1869            f = open("%s/hosts" % configdir, "w")
1870            print >> f, string.join(hosts, '\n')
1871            f.close()
1872        except EnvironmentError, e:
1873            raise service_error(service_error.internal, 
1874                    "Cannot write hosts file: %s" % e)
1875        try:
1876            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1877            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1878            copy_file(os.path.join(keydir, 'ca.pem'), 
1879                    os.path.join(configdir, 'ca.pem'))
1880            copy_file(os.path.join(keydir, 'node.pem'), 
1881                    os.path.join(configdir, 'node.pem'))
1882        except EnvironmentError, e:
1883            raise service_error(service_error.internal, 
1884                    "Cannot copy keyfiles: %s" % e)
1885
1886        # Allow the individual testbeds to access the configuration files,
1887        # again by setting an attribute for the relevant pathnames on each
1888        # allocation principal.  Yeah, that's a long list comprehension.
1889        self.append_experiment_authorization(expid, set([
1890            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1891                    for tb in tbparams.keys() \
1892                        for f in ("hosts", 'ca.pem', 'node.pem', 
1893                            gw_secretkey_base, gw_pubkey_base)]))
1894
1895        attrs = [ 
1896                {
1897                    'attribute': 'ssh_pubkey', 
1898                    'value': '%s/%s/config/%s' % \
1899                            (self.repo_url, expid, gw_pubkey_base)
1900                },
1901                {
1902                    'attribute': 'ssh_secretkey', 
1903                    'value': '%s/%s/config/%s' % \
1904                            (self.repo_url, expid, gw_secretkey_base)
1905                },
1906                {
1907                    'attribute': 'hosts', 
1908                    'value': '%s/%s/config/hosts' % \
1909                            (self.repo_url, expid)
1910                },
1911                {
1912                    'attribute': 'seer_ca_pem', 
1913                    'value': '%s/%s/config/%s' % \
1914                            (self.repo_url, expid, 'ca.pem')
1915                },
1916                {
1917                    'attribute': 'seer_node_pem', 
1918                    'value': '%s/%s/config/%s' % \
1919                            (self.repo_url, expid, 'node.pem')
1920                },
1921            ]
1922        return attrs
1923
1924
1925    def get_vtopo(self, req, fid):
1926        """
1927        Return the stored virtual topology for this experiment
1928        """
1929        rv = None
1930        state = None
1931
1932        req = req.get('VtopoRequestBody', None)
1933        if not req:
1934            raise service_error(service_error.req,
1935                    "Bad request format (no VtopoRequestBody)")
1936        exp = req.get('experiment', None)
1937        if exp:
1938            if exp.has_key('fedid'):
1939                key = exp['fedid']
1940                keytype = "fedid"
1941            elif exp.has_key('localname'):
1942                key = exp['localname']
1943                keytype = "localname"
1944            else:
1945                raise service_error(service_error.req, "Unknown lookup type")
1946        else:
1947            raise service_error(service_error.req, "No request?")
1948
1949        proof = self.check_experiment_access(fid, key)
1950
1951        self.state_lock.acquire()
1952        # XXX: this needs to be recalculated
1953        if key in self.state:
1954            if self.state[key].top is not None:
1955                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1956                rv = { 'experiment' : {keytype: key },
1957                        'vtopo': vtopo,
1958                        'proof': proof.to_dict(), 
1959                    }
1960            else:
1961                state = self.state[key].status
1962        self.state_lock.release()
1963
1964        if rv: return rv
1965        else: 
1966            if state:
1967                raise service_error(service_error.partial, 
1968                        "Not ready: %s" % state)
1969            else:
1970                raise service_error(service_error.req, "No such experiment")
1971
1972    def get_vis(self, req, fid):
1973        """
1974        Return the stored visualization for this experiment
1975        """
1976        rv = None
1977        state = None
1978
1979        req = req.get('VisRequestBody', None)
1980        if not req:
1981            raise service_error(service_error.req,
1982                    "Bad request format (no VisRequestBody)")
1983        exp = req.get('experiment', None)
1984        if exp:
1985            if exp.has_key('fedid'):
1986                key = exp['fedid']
1987                keytype = "fedid"
1988            elif exp.has_key('localname'):
1989                key = exp['localname']
1990                keytype = "localname"
1991            else:
1992                raise service_error(service_error.req, "Unknown lookup type")
1993        else:
1994            raise service_error(service_error.req, "No request?")
1995
1996        proof = self.check_experiment_access(fid, key)
1997
1998        self.state_lock.acquire()
1999        # Generate the visualization
2000        if key in self.state:
2001            if self.state[key].top is not None:
2002                try:
2003                    vis = self.genviz(
2004                            topdl.topology_to_vtopo(self.state[key].topo))
2005                except service_error, e:
2006                    self.state_lock.release()
2007                    raise e
2008                rv =  { 'experiment' : {keytype: key },
2009                        'vis': vis,
2010                        'proof': proof.to_dict(), 
2011                        }
2012            else:
2013                state = self.state[key].status
2014        self.state_lock.release()
2015
2016        if rv: return rv
2017        else:
2018            if state:
2019                raise service_error(service_error.partial, 
2020                        "Not ready: %s" % state)
2021            else:
2022                raise service_error(service_error.req, "No such experiment")
2023
2024   
2025    def save_federant_information(self, allocated, tbparams, eid, top):
2026        """
2027        Store the various data that have changed in the experiment state
2028        between when it was started and the beginning of resource allocation.
2029        This is basically the information about each local allocation.  This
2030        fills in the values of the placeholder allocation in the state.  It
2031        also collects the access proofs and returns them as dicts for a
2032        response message.
2033        """
2034        self.state_lock.acquire()
2035        exp = self.state[eid]
2036        exp.top = top.clone()
2037        # save federant information
2038        for k in allocated.keys():
2039            exp.add_allocation(tbparams[k])
2040            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2041                type="testbed", localname=[k], 
2042                service=[ s.to_topdl() for s in tbparams[k].services]))
2043
2044        # Access proofs for the response message
2045        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2046                    for p in tbparams[k].proof]
2047        exp.updated()
2048        if self.state_filename: 
2049            self.write_state()
2050        self.state_lock.release()
2051        return proofs
2052
2053    def clear_placeholder(self, eid, expid, tmpdir):
2054        """
2055        Clear the placeholder and remove any allocated temporary dir.
2056        """
2057
2058        self.state_lock.acquire()
2059        del self.state[eid]
2060        del self.state[expid]
2061        if self.state_filename: self.write_state()
2062        self.state_lock.release()
2063        if tmpdir and self.cleanup:
2064            self.remove_dirs(tmpdir)
2065
2066    # end of create_experiment sub-functions
2067
2068    def create_experiment(self, req, fid):
2069        """
2070        The external interface to experiment creation called from the
2071        dispatcher.
2072
2073        Creates a working directory, splits the incoming description using the
2074        splitter script and parses out the various subsections using the
2075        classes above.  Once each sub-experiment is created, use pooled threads
2076        to instantiate them and start it all up.
2077        """
2078
2079        req = req.get('CreateRequestBody', None)
2080        if req:
2081            key = self.get_experiment_key(req)
2082        else:
2083            raise service_error(service_error.req,
2084                    "Bad request format (no CreateRequestBody)")
2085
2086        # Import information from the requester
2087        if self.auth.import_credentials(data_list=req.get('credential', [])):
2088            self.auth.save()
2089
2090        # Make sure that the caller can talk to us
2091        proof = self.check_experiment_access(fid, key)
2092
2093        # Install the testbed map entries supplied with the request into a copy
2094        # of the testbed map.
2095        tbmap = dict(self.tbmap)
2096        for m in req.get('testbedmap', []):
2097            if 'testbed' in m and 'uri' in m:
2098                tbmap[m['testbed']] = m['uri']
2099
2100        # a place to work
2101        try:
2102            tmpdir = tempfile.mkdtemp(prefix="split-")
2103            os.mkdir(tmpdir+"/keys")
2104        except EnvironmentError:
2105            raise service_error(service_error.internal, "Cannot create tmp dir")
2106
2107        tbparams = { }
2108
2109        eid, expid, expcert_file = \
2110                self.get_experiment_ids_and_start(key, tmpdir)
2111
2112        # This catches exceptions to clear the placeholder if necessary
2113        try: 
2114            if not (eid and expid):
2115                raise service_error(service_error.internal, 
2116                        "Cannot find local experiment info!?")
2117
2118            top = self.get_topology(req, tmpdir)
2119            self.confirm_software(top)
2120            # Assign the IPs
2121            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2122            # Find the testbeds to look up
2123            tb_hosts = { }
2124            testbeds = [ ]
2125            for e in top.elements:
2126                if isinstance(e, topdl.Computer):
2127                    tb = e.get_attribute('testbed') or 'default'
2128                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2129                    else: 
2130                        tb_hosts[tb] = [ e.name ]
2131                        testbeds.append(tb)
2132
2133            masters, pmasters = self.get_testbed_services(req, testbeds)
2134            allocated = { }         # Testbeds we can access
2135            topo ={ }               # Sub topologies
2136            connInfo = { }          # Connection information
2137
2138            self.split_topology(top, topo, testbeds)
2139
2140            self.get_access_to_testbeds(testbeds, fid, allocated, 
2141                    tbparams, masters, tbmap, expid, expcert_file)
2142
2143            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2144
2145            part = experiment_partition(self.auth, self.store_url, tbmap,
2146                    self.muxmax, self.direct_transit)
2147            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2148                    connInfo, expid)
2149
2150            auth_attrs = set()
2151            # Now get access to the dynamic testbeds (those added above)
2152            for tb in [ t for t in topo if t not in allocated]:
2153                self.get_access(tb, tbparams, fid, masters, tbmap, 
2154                        expid, expcert_file)
2155                allocated[tb] = 1
2156                store_keys = topo[tb].get_attribute('store_keys')
2157                # Give the testbed access to keys it exports or imports
2158                if store_keys:
2159                    auth_attrs.update(set([
2160                        (tbparams[tb].allocID, sk) \
2161                                for sk in store_keys.split(" ")]))
2162
2163            if auth_attrs:
2164                self.append_experiment_authorization(expid, auth_attrs)
2165
2166            # transit and disconnected testbeds may not have a connInfo entry.
2167            # Fill in the blanks.
2168            for t in allocated.keys():
2169                if not connInfo.has_key(t):
2170                    connInfo[t] = { }
2171
2172            self.wrangle_software(expid, top, topo, tbparams)
2173
2174            proofs = self.save_federant_information(allocated, tbparams, 
2175                    eid, top)
2176        except service_error, e:
2177            # If something goes wrong in the parse (usually an access error)
2178            # clear the placeholder state.  From here on out the code delays
2179            # exceptions.  Failing at this point returns a fault to the remote
2180            # caller.
2181            self.clear_placeholder(eid, expid, tmpdir)
2182            raise e
2183
2184        # Start the background swapper and return the starting state.  From
2185        # here on out, the state will stick around a while.
2186
2187        # Create a logger that logs to the experiment's state object as well as
2188        # to the main log file.
2189        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2190        alloc_collector = self.list_log(self.state[eid].log)
2191        h = logging.StreamHandler(alloc_collector)
2192        # XXX: there should be a global one of these rather than repeating the
2193        # code.
2194        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2195                    '%d %b %y %H:%M:%S'))
2196        alloc_log.addHandler(h)
2197
2198        # Start a thread to do the resource allocation
2199        t  = Thread(target=self.allocate_resources,
2200                args=(allocated, masters, eid, expid, tbparams, 
2201                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2202                    connInfo, tbmap, expcert_file),
2203                name=eid)
2204        t.start()
2205
2206        rv = {
2207                'experimentID': [
2208                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2209                ],
2210                'experimentStatus': 'starting',
2211                'proof': [ proof.to_dict() ] + proofs,
2212            }
2213
2214        return rv
2215   
2216    def get_experiment_fedid(self, key):
2217        """
2218        find the fedid associated with the localname key in the state database.
2219        """
2220
2221        rv = None
2222        self.state_lock.acquire()
2223        if key in self.state:
2224            rv = self.state[key].fedid
2225        self.state_lock.release()
2226        return rv
2227
2228    def check_experiment_access(self, fid, key):
2229        """
2230        Confirm that the fid has access to the experiment.  Though a request
2231        may be made in terms of a local name, the access attribute is always
2232        the experiment's fedid.
2233        """
2234        if not isinstance(key, fedid):
2235            key = self.get_experiment_fedid(key)
2236
2237        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2238
2239        if access_ok:
2240            return proof
2241        else:
2242            raise service_error(service_error.access, "Access Denied",
2243                proof)
2244
2245
2246    def get_handler(self, path, fid):
2247        """
2248        Perhaps surprisingly named, this function handles HTTP GET requests to
2249        this server (SOAP requests are POSTs).
2250        """
2251        self.log.info("Get handler %s %s" % (path, fid))
2252        # XXX: log proofs?
2253        if self.auth.check_attribute(fid, path):
2254            return ("%s/%s" % (self.repodir, path), "application/binary")
2255        else:
2256            return (None, None)
2257
2258    def update_info(self, key, force=False):
2259        top = None
2260        self.state_lock.acquire()
2261        if key in self.state:
2262            if force or self.state[key].older_than(self.info_cache_limit):
2263                top = self.state[key].top
2264                if top is not None: top = top.clone()
2265                d1, info_params, cert, d2 = \
2266                        self.get_segment_info(self.state[key], need_lock=False)
2267        self.state_lock.release()
2268
2269        if top is None: return
2270
2271        try:
2272            tmpdir = tempfile.mkdtemp(prefix="info-")
2273        except EnvironmentError:
2274            raise service_error(service_error.internal, 
2275                    "Cannot create tmp dir")
2276        cert_file = self.make_temp_certfile(cert, tmpdir)
2277
2278        data = []
2279        try:
2280            for k, (uri, aid) in info_params.items():
2281                info=self.info_segment(log=self.log, testbed=uri,
2282                            cert_file=cert_file, cert_pwd=None,
2283                            trusted_certs=self.trusted_certs,
2284                            caller=self.call_InfoSegment)
2285                info(uri, aid)
2286                data.append(info)
2287        # Clean up the tmpdir no matter what
2288        finally:
2289            if tmpdir: self.remove_dirs(tmpdir)
2290
2291        self.annotate_topology(top, data)
2292        self.state_lock.acquire()
2293        if key in self.state:
2294            self.state[key].top = top
2295            self.state[key].updated()
2296            if self.state_filename: self.write_state()
2297        self.state_lock.release()
2298
2299   
2300    def get_info(self, req, fid):
2301        """
2302        Return all the stored info about this experiment
2303        """
2304        rv = None
2305
2306        req = req.get('InfoRequestBody', None)
2307        if not req:
2308            raise service_error(service_error.req,
2309                    "Bad request format (no InfoRequestBody)")
2310        exp = req.get('experiment', None)
2311        legacy = req.get('legacy', False)
2312        fresh = req.get('fresh', False)
2313        if exp:
2314            if exp.has_key('fedid'):
2315                key = exp['fedid']
2316                keytype = "fedid"
2317            elif exp.has_key('localname'):
2318                key = exp['localname']
2319                keytype = "localname"
2320            else:
2321                raise service_error(service_error.req, "Unknown lookup type")
2322        else:
2323            raise service_error(service_error.req, "No request?")
2324
2325        proof = self.check_experiment_access(fid, key)
2326
2327        self.update_info(key, fresh)
2328
2329        self.state_lock.acquire()
2330        if self.state.has_key(key):
2331            rv = self.state[key].get_info()
2332            # Copy the topo if we need legacy annotations
2333            if legacy:
2334                top = self.state[key].top
2335                if top is not None: top = top.clone()
2336        self.state_lock.release()
2337
2338        # If the legacy visualization and topology representations are
2339        # requested, calculate them and add them to the return.
2340        if legacy and rv is not None:
2341            if top is not None:
2342                vtopo = topdl.topology_to_vtopo(top)
2343                if vtopo is not None:
2344                    rv['vtopo'] = vtopo
2345                    try:
2346                        vis = self.genviz(vtopo)
2347                    except service_error, e:
2348                        self.log.debug('Problem generating visualization: %s' \
2349                                % e)
2350                        vis = None
2351                    if vis is not None:
2352                        rv['vis'] = vis
2353        if rv:
2354            rv['proof'] = proof.to_dict()
2355            return rv
2356        else:
2357            raise service_error(service_error.req, "No such experiment")
2358
2359    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2360            results):
2361        """
2362        Call OperateSegment on multiple testbeds and gather the results.
2363        op_params contains the parameters needed to contact that testbed, cert
2364        is a certificate containing the fedid to use, op is the operation,
2365        testbeds is a dict mapping testbed name to targets in that testbed,
2366        params are the parameters to include a,d results is a growing list of
2367        the results of the calls.
2368        """
2369        try:
2370            tmpdir = tempfile.mkdtemp(prefix="info-")
2371        except EnvironmentError:
2372            raise service_error(service_error.internal, 
2373                    "Cannot create tmp dir")
2374        cert_file = self.make_temp_certfile(cert, tmpdir)
2375
2376        try:
2377            for tb, targets in testbeds.items():
2378                if tb in op_params:
2379                    uri, aid = op_params[tb]
2380                    operate=self.operation_segment(log=self.log, testbed=uri,
2381                                cert_file=cert_file, cert_pwd=None,
2382                                trusted_certs=self.trusted_certs,
2383                                caller=self.call_OperationSegment)
2384                    if operate(uri, aid, op, targets, params):
2385                        if operate.status is not None:
2386                            results.extend(operate.status)
2387                            continue
2388                # Something went wrong in a weird way.  Add statuses
2389                # that reflect that to results
2390                for t in targets:
2391                    results.append(operation_status(t, 
2392                        operation_status.federant,
2393                        'Unexpected error on %s' % tb))
2394        # Clean up the tmpdir no matter what
2395        finally:
2396            if tmpdir: self.remove_dirs(tmpdir)
2397
2398    def do_operation(self, req, fid):
2399        """
2400        Find the testbeds holding each target and ask them to carry out the
2401        operation.  Return the statuses.
2402        """
2403        # Map an element to the testbed containing it
2404        def element_to_tb(e):
2405            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2406            elif isinstance(e, topdl.Testbed): return e.name
2407            else: return None
2408        # If d is an operation_status object, make it a dict
2409        def make_dict(d):
2410            if isinstance(d, dict): return d
2411            elif isinstance(d, operation_status): return d.to_dict()
2412            else: return { }
2413
2414        def element_name(e):
2415            if isinstance(e, topdl.Computer): return e.name
2416            elif isinstance(e, topdl.Testbed): 
2417                if e.localname: return e.localname[0]
2418                else: return None
2419            else: return None
2420
2421        req = req.get('OperationRequestBody', None)
2422        if not req:
2423            raise service_error(service_error.req,
2424                    "Bad request format (no OperationRequestBody)")
2425        exp = req.get('experiment', None)
2426        op = req.get('operation', None)
2427        targets = set(req.get('target', []))
2428        params = req.get('parameter', None)
2429
2430        if exp:
2431            if 'fedid' in exp:
2432                key = exp['fedid']
2433                keytype = "fedid"
2434            elif 'localname' in exp:
2435                key = exp['localname']
2436                keytype = "localname"
2437            else:
2438                raise service_error(service_error.req, "Unknown lookup type")
2439        else:
2440            raise service_error(service_error.req, "No request?")
2441
2442        if op is None or not targets:
2443            raise service_error(service_error.req, "No request?")
2444
2445        proof = self.check_experiment_access(fid, key)
2446        self.state_lock.acquire()
2447        if key in self.state:
2448            d1, op_params, cert, d2 = \
2449                    self.get_segment_info(self.state[key], need_lock=False,
2450                            key='tb')
2451            top = self.state[key].top
2452            if top is not None:
2453                top = top.clone()
2454        self.state_lock.release()
2455
2456        if top is None:
2457            raise service_error(service_error.partial, "No topology yet", 
2458                    proof=proof)
2459
2460        testbeds = { }
2461        results = []
2462        for e in top.elements:
2463            ename = element_name(e)
2464            if ename in targets:
2465                tb = element_to_tb(e)
2466                targets.remove(ename)
2467                if tb is not None:
2468                    if tb in testbeds: testbeds[tb].append(ename)
2469                    else: testbeds[tb] = [ ename ]
2470                else:
2471                    results.append(operation_status(e.name, 
2472                        code=operation_status.no_target, 
2473                        description='Cannot map target to testbed'))
2474
2475        for t in targets:
2476            results.append(operation_status(t, operation_status.no_target))
2477
2478        self.operate_on_segments(op_params, cert, op, testbeds, params,
2479                results)
2480
2481        return { 
2482                'experiment': exp, 
2483                'status': [make_dict(r) for r in results],
2484                'proof': proof.to_dict()
2485                }
2486
2487
2488    def get_multi_info(self, req, fid):
2489        """
2490        Return all the stored info that this fedid can access
2491        """
2492        rv = { 'info': [ ], 'proof': [ ] }
2493
2494        self.state_lock.acquire()
2495        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2496            try:
2497                proof = self.check_experiment_access(fid, key)
2498            except service_error, e:
2499                if e.code == service_error.access:
2500                    continue
2501                else:
2502                    self.state_lock.release()
2503                    raise e
2504
2505            if self.state.has_key(key):
2506                e = self.state[key].get_info()
2507                e['proof'] = proof.to_dict()
2508                rv['info'].append(e)
2509                rv['proof'].append(proof.to_dict())
2510        self.state_lock.release()
2511        return rv
2512
2513    def check_termination_status(self, fed_exp, force):
2514        """
2515        Confirm that the experiment is sin a valid state to stop (or force it)
2516        return the state - invalid states for deletion and force settings cause
2517        exceptions.
2518        """
2519        self.state_lock.acquire()
2520        status = fed_exp.status
2521
2522        if status:
2523            if status in ('starting', 'terminating'):
2524                if not force:
2525                    self.state_lock.release()
2526                    raise service_error(service_error.partial, 
2527                            'Experiment still being created or destroyed')
2528                else:
2529                    self.log.warning('Experiment in %s state ' % status + \
2530                            'being terminated by force.')
2531            self.state_lock.release()
2532            return status
2533        else:
2534            # No status??? trouble
2535            self.state_lock.release()
2536            raise service_error(service_error.internal,
2537                    "Experiment has no status!?")
2538
2539    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2540        ids = []
2541        term_params = { }
2542        if need_lock: self.state_lock.acquire()
2543        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2544        expcert = fed_exp.identity
2545        repo = "%s" % fed_exp.fedid
2546
2547        # Collect the allocation/segment ids into a dict keyed by the fedid
2548        # of the allocation that contains a tuple of uri, aid
2549        for i, fed in enumerate(fed_exp.get_all_allocations()):
2550            uri = fed.uri
2551            aid = fed.allocID
2552            if key == 'aid': term_params[aid] = (uri, aid)
2553            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2554
2555        if need_lock: self.state_lock.release()
2556        return ids, term_params, expcert, repo
2557
2558
2559    def get_termination_info(self, fed_exp):
2560        self.state_lock.acquire()
2561        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2562        # Change the experiment state
2563        fed_exp.status = 'terminating'
2564        fed_exp.updated()
2565        if self.state_filename: self.write_state()
2566        self.state_lock.release()
2567
2568        return ids, term_params, expcert, repo
2569
2570
2571    def deallocate_resources(self, term_params, expcert, status, force, 
2572            dealloc_log):
2573        tmpdir = None
2574        # This try block makes sure the tempdir is cleared
2575        try:
2576            # If no expcert, try the deallocation as the experiment
2577            # controller instance.
2578            if expcert and self.auth_type != 'legacy': 
2579                try:
2580                    tmpdir = tempfile.mkdtemp(prefix="term-")
2581                except EnvironmentError:
2582                    raise service_error(service_error.internal, 
2583                            "Cannot create tmp dir")
2584                cert_file = self.make_temp_certfile(expcert, tmpdir)
2585                pw = None
2586            else: 
2587                cert_file = self.cert_file
2588                pw = self.cert_pwd
2589
2590            # Stop everyone.  NB, wait_for_all waits until a thread starts
2591            # and then completes, so we can't wait if nothing starts.  So,
2592            # no tbparams, no start.
2593            if len(term_params) > 0:
2594                tp = thread_pool(self.nthreads)
2595                for k, (uri, aid) in term_params.items():
2596                    # Create and start a thread to stop the segment
2597                    tp.wait_for_slot()
2598                    t  = pooled_thread(\
2599                            target=self.terminate_segment(log=dealloc_log,
2600                                testbed=uri,
2601                                cert_file=cert_file, 
2602                                cert_pwd=pw,
2603                                trusted_certs=self.trusted_certs,
2604                                caller=self.call_TerminateSegment),
2605                            args=(uri, aid), name=k,
2606                            pdata=tp, trace_file=self.trace_file)
2607                    t.start()
2608                # Wait for completions
2609                tp.wait_for_all_done()
2610
2611            # release the allocations (failed experiments have done this
2612            # already, and starting experiments may be in odd states, so we
2613            # ignore errors releasing those allocations
2614            try: 
2615                for k, (uri, aid)  in term_params.items():
2616                    self.release_access(None, aid, uri=uri,
2617                            cert_file=cert_file, cert_pwd=pw)
2618            except service_error, e:
2619                if status != 'failed' and not force:
2620                    raise e
2621
2622        # Clean up the tmpdir no matter what
2623        finally:
2624            if tmpdir: self.remove_dirs(tmpdir)
2625
2626    def terminate_experiment(self, req, fid):
2627        """
2628        Swap this experiment out on the federants and delete the shared
2629        information
2630        """
2631        tbparams = { }
2632        req = req.get('TerminateRequestBody', None)
2633        if not req:
2634            raise service_error(service_error.req,
2635                    "Bad request format (no TerminateRequestBody)")
2636
2637        key = self.get_experiment_key(req, 'experiment')
2638        proof = self.check_experiment_access(fid, key)
2639        exp = req.get('experiment', False)
2640        force = req.get('force', False)
2641
2642        dealloc_list = [ ]
2643
2644
2645        # Create a logger that logs to the dealloc_list as well as to the main
2646        # log file.
2647        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2648        h = logging.StreamHandler(self.list_log(dealloc_list))
2649        # XXX: there should be a global one of these rather than repeating the
2650        # code.
2651        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2652                    '%d %b %y %H:%M:%S'))
2653        dealloc_log.addHandler(h)
2654
2655        self.state_lock.acquire()
2656        fed_exp = self.state.get(key, None)
2657        self.state_lock.release()
2658        repo = None
2659
2660        if fed_exp:
2661            status = self.check_termination_status(fed_exp, force)
2662            # get_termination_info updates the experiment state
2663            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2664            self.deallocate_resources(term_params, expcert, status, force, 
2665                    dealloc_log)
2666
2667            # Remove the terminated experiment
2668            self.state_lock.acquire()
2669            for id in ids:
2670                self.clear_experiment_authorization(id, need_state_lock=False)
2671                if id in self.state: del self.state[id]
2672
2673            if self.state_filename: self.write_state()
2674            self.state_lock.release()
2675
2676            # Delete any synch points associated with this experiment.  All
2677            # synch points begin with the fedid of the experiment.
2678            fedid_keys = set(["fedid:%s" % f for f in ids \
2679                    if isinstance(f, fedid)])
2680            for k in self.synch_store.all_keys():
2681                try:
2682                    if len(k) > 45 and k[0:46] in fedid_keys:
2683                        self.synch_store.del_value(k)
2684                except synch_store.BadDeletionError:
2685                    pass
2686            self.write_store()
2687
2688            # Remove software and other cached stuff from the filesystem.
2689            if repo:
2690                self.remove_dirs("%s/%s" % (self.repodir, repo))
2691       
2692            return { 
2693                    'experiment': exp , 
2694                    'deallocationLog': string.join(dealloc_list, ''),
2695                    'proof': [proof.to_dict()],
2696                    }
2697        else:
2698            raise service_error(service_error.req, "No saved state")
2699
2700
2701    def GetValue(self, req, fid):
2702        """
2703        Get a value from the synchronized store
2704        """
2705        req = req.get('GetValueRequestBody', None)
2706        if not req:
2707            raise service_error(service_error.req,
2708                    "Bad request format (no GetValueRequestBody)")
2709       
2710        name = req.get('name', None)
2711        wait = req.get('wait', False)
2712        rv = { 'name': name }
2713
2714        if not name:
2715            raise service_error(service_error.req, "No name?")
2716
2717        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2718
2719        if access_ok:
2720            self.log.debug("[GetValue] asking for %s " % name)
2721            try:
2722                v = self.synch_store.get_value(name, wait)
2723            except synch_store.RevokedKeyError:
2724                # No more synch on this key
2725                raise service_error(service_error.federant, 
2726                        "Synch key %s revoked" % name)
2727            if v is not None:
2728                rv['value'] = v
2729            rv['proof'] = proof.to_dict()
2730            self.log.debug("[GetValue] got %s from %s" % (v, name))
2731            return rv
2732        else:
2733            raise service_error(service_error.access, "Access Denied",
2734                    proof=proof)
2735       
2736
2737    def SetValue(self, req, fid):
2738        """
2739        Set a value in the synchronized store
2740        """
2741        req = req.get('SetValueRequestBody', None)
2742        if not req:
2743            raise service_error(service_error.req,
2744                    "Bad request format (no SetValueRequestBody)")
2745       
2746        name = req.get('name', None)
2747        v = req.get('value', '')
2748
2749        if not name:
2750            raise service_error(service_error.req, "No name?")
2751
2752        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2753
2754        if access_ok:
2755            try:
2756                self.synch_store.set_value(name, v)
2757                self.write_store()
2758                self.log.debug("[SetValue] set %s to %s" % (name, v))
2759            except synch_store.CollisionError:
2760                # Translate into a service_error
2761                raise service_error(service_error.req,
2762                        "Value already set: %s" %name)
2763            except synch_store.RevokedKeyError:
2764                # No more synch on this key
2765                raise service_error(service_error.federant, 
2766                        "Synch key %s revoked" % name)
2767                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2768        else:
2769            raise service_error(service_error.access, "Access Denied",
2770                    proof=proof)
Note: See TracBrowser for help on using the repository browser.