source: fedd/federation/experiment_control.py @ a11eda5

compt_changes
Last change on this file since a11eda5 was a11eda5, checked in by Ted Faber <faber@…>, 12 years ago

Add support for testbeds to indicate preference for outgoing portal
connections.

  • Property mode set to 100644
File size: 92.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from deter import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39from operation_status import operation_status
40
41from deter import topdl
42from deter import ip_allocator
43from deter import ip_addr
44import list_log
45
46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
53class experiment_control_local(experiment_control_legacy):
54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
60
61    class ssh_cmd_timeout(RuntimeError): pass
62   
63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
65    call_StartSegment = service_caller('StartSegment')
66    call_TerminateSegment = service_caller('TerminateSegment')
67    call_InfoSegment = service_caller('InfoSegment')
68    call_OperationSegment = service_caller('OperationSegment')
69    call_Ns2Topdl = service_caller('Ns2Topdl')
70
71    def __init__(self, config=None, auth=None):
72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
90        self.list_log = list_log.list_log
91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
102        self.repodir = config.get("experiment_control", "repodir")
103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
105
106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
110        self.nthreads = 10
111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131        accessdb_file = config.get("experiment_control", "accessdb")
132
133        dt = config.get("experiment_control", "direct_transit")
134        self.auth_type = config.get('experiment_control', 'auth_type') \
135                or 'legacy'
136        self.auth_dir = config.get('experiment_control', 'auth_dir')
137        # XXX: document this!
138        self.info_cache_limit = \
139                config.getint('experiment_control', 'info_cache', 600)
140        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
141        else: self.direct_transit = [ ]
142        # NB for internal master/slave ops, not experiment setup
143        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
144
145        self.overrides = set([])
146        ovr = config.get('experiment_control', 'overrides')
147        if ovr:
148            for o in ovr.split(","):
149                o = o.strip()
150                if o.startswith('fedid:'): o = o[len('fedid:'):]
151                self.overrides.add(fedid(hexstr=o))
152
153        self.state = { }
154        self.state_lock = Lock()
155        self.tclsh = "/usr/local/bin/otclsh"
156        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
157                config.get("experiment_control", "tcl_splitter",
158                        "/usr/testbed/lib/ns2ir/parse.tcl")
159        mapdb_file = config.get("experiment_control", "mapdb")
160        self.trace_file = sys.stderr
161
162        self.def_expstart = \
163                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
164                "/tmp/federate";
165        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
166                "FEDDIR/hosts";
167        self.def_gwstart = \
168                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
169                "/tmp/bridge.log";
170        self.def_mgwstart = \
171                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
172                "/tmp/bridge.log";
173        self.def_gwimage = "FBSD61-TUNNEL2";
174        self.def_gwtype = "pc";
175        self.local_access = { }
176
177        if self.auth_type == 'legacy':
178            if auth:
179                self.auth = auth
180            else:
181                self.log.error( "[access]: No authorizer initialized, " +\
182                        "creating local one.")
183                auth = authorizer()
184            self.get_access = self.legacy_get_access
185        elif self.auth_type == 'abac':
186            self.auth = abac_authorizer(load=self.auth_dir)
187        else:
188            raise service_error(service_error.internal, 
189                    "Unknown auth_type: %s" % self.auth_type)
190
191        if mapdb_file:
192            self.read_mapdb(mapdb_file)
193        else:
194            self.log.warn("[experiment_control] No testbed map, using defaults")
195            self.tbmap = { 
196                    'deter':'https://users.isi.deterlab.net:23235',
197                    'emulab':'https://users.isi.deterlab.net:23236',
198                    'ucb':'https://users.isi.deterlab.net:23237',
199                    }
200
201        if accessdb_file:
202                self.read_accessdb(accessdb_file)
203        else:
204            raise service_error(service_error.internal,
205                    "No accessdb specified in config")
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
219        # Dispatch tables
220        self.soap_services = {\
221                'New': soap_handler('New', self.new_experiment),
222                'Create': soap_handler('Create', self.create_experiment),
223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
227                'Operation': soap_handler('Operation', self.do_operation),
228                'Terminate': soap_handler('Terminate', 
229                    self.terminate_experiment),
230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
232        }
233
234        self.xmlrpc_services = {\
235                'New': xmlrpc_handler('New', self.new_experiment),
236                'Create': xmlrpc_handler('Create', self.create_experiment),
237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
241                'Terminate': xmlrpc_handler('Terminate',
242                    self.terminate_experiment),
243                'Operation': xmlrpc_handler('Operation', self.do_operation),
244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
246        }
247
248    # Call while holding self.state_lock
249    def write_state(self):
250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
262        except EnvironmentError, e:
263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
269
270    @staticmethod
271    def get_alloc_ids(exp):
272        """
273        Used by read_store and read state.  This used to be worse.
274        """
275
276        return [ a.allocID for a in exp.get_all_allocations() ]
277
278
279    # Call while holding self.state_lock
280    def read_state(self):
281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
286       
287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
292        except EnvironmentError, e:
293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
299        for s in self.state.values():
300            try:
301
302                eid = s.fedid
303                if eid : 
304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
307                        #self.auth.set_attribute(s['owner'], eid)
308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
317                else:
318                    raise KeyError("No experiment id")
319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
322
323
324    def read_accessdb(self, accessdb_file):
325        """
326        Read the mapping from fedids that can create experiments to their name
327        in the 3-level access namespace.  All will be asserted from this
328        testbed and can include the local username and porject that will be
329        asserted on their behalf by this fedd.  Each fedid is also added to the
330        authorization system with the "create" attribute.
331        """
332        self.accessdb = {}
333        # These are the regexps for parsing the db
334        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
335        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
336                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
337        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
338                "\s*->\s*(" + name_expr + ")\s*$")
339        lineno = 0
340
341        # Parse the mappings and store in self.authdb, a dict of
342        # fedid -> (proj, user)
343        try:
344            f = open(accessdb_file, "r")
345            for line in f:
346                lineno += 1
347                line = line.strip()
348                if len(line) == 0 or line.startswith('#'):
349                    continue
350                m = project_line.match(line)
351                if m:
352                    fid = fedid(hexstr=m.group(1))
353                    project, user = m.group(2,3)
354                    if not self.accessdb.has_key(fid):
355                        self.accessdb[fid] = []
356                    self.accessdb[fid].append((project, user))
357                    continue
358
359                m = user_line.match(line)
360                if m:
361                    fid = fedid(hexstr=m.group(1))
362                    project = None
363                    user = m.group(2)
364                    if not self.accessdb.has_key(fid):
365                        self.accessdb[fid] = []
366                    self.accessdb[fid].append((project, user))
367                    continue
368                self.log.warn("[experiment_control] Error parsing access " +\
369                        "db %s at line %d" %  (accessdb_file, lineno))
370        except EnvironmentError:
371            raise service_error(service_error.internal,
372                    ("Error opening/reading %s as experiment " +\
373                            "control accessdb") %  accessdb_file)
374        f.close()
375
376        # Initialize the authorization attributes
377        # XXX: legacy
378        if self.auth_type == 'legacy':
379            for fid in self.accessdb.keys():
380                self.auth.set_attribute(fid, 'create')
381                self.auth.set_attribute(fid, 'new')
382
383    def read_mapdb(self, file):
384        """
385        Read a simple colon separated list of mappings for the
386        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
387        also adds testbeds to active if they include , active after
388        their name.
389        """
390
391        self.tbmap = { }
392        self.tbactive = set()
393        lineno =0
394        try:
395            f = open(file, "r")
396            for line in f:
397                lineno += 1
398                line = line.strip()
399                if line.startswith('#') or len(line) == 0:
400                    continue
401                try:
402                    label, url = line.split(':', 1)
403                    if ',' in label:
404                        label, act = label.split(',', 1)
405                        active = (act.strip() == 'active')
406                    else:
407                        active = False
408                    self.tbmap[label] = url
409                    if active: self.tbactive.add(label)
410                except ValueError, e:
411                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
412                            "map db: %s %s" % (lineno, line, e))
413        except EnvironmentError, e:
414            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
415                    "open %s: %s" % (file, e))
416        else:
417            f.close()
418
419    def read_store(self):
420        try:
421            self.synch_store = synch_store()
422            self.synch_store.load(self.store_filename)
423            self.log.debug("[read_store]: Read store from %s" % \
424                    self.store_filename)
425        except EnvironmentError, e:
426            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
427                    % (self.state_filename, e))
428            self.synch_store = synch_store()
429
430        # Set the initial permissions on data in the store.  XXX: This ad hoc
431        # authorization attribute initialization is getting out of hand.
432        # XXX: legacy
433        if self.auth_type == 'legacy':
434            for k in self.synch_store.all_keys():
435                try:
436                    if k.startswith('fedid:'):
437                        fid = fedid(hexstr=k[6:46])
438                        if self.state.has_key(fid):
439                            for a in self.get_alloc_ids(self.state[fid]):
440                                self.auth.set_attribute(a, k)
441                except ValueError, e:
442                    self.log.warn("Cannot deduce permissions for %s" % k)
443
444
445    def write_store(self):
446        """
447        Write a new copy of synch_store after writing current state
448        to a backup.  We use the internal synch_store pickle method to avoid
449        incinsistent data.
450
451        State format is a simple pickling of the store.
452        """
453        if os.access(self.store_filename, os.W_OK):
454            copy_file(self.store_filename, \
455                    "%s.bak" % self.store_filename)
456        try:
457            self.synch_store.save(self.store_filename)
458        except EnvironmentError, e:
459            self.log.error("Can't write file %s: %s" % \
460                    (self.store_filename, e))
461        except TypeError, e:
462            self.log.error("Pickling problem (TypeError): %s" % e)
463
464
465    def remove_dirs(self, dir):
466        """
467        Remove the directory tree and all files rooted at dir.  Log any errors,
468        but continue.
469        """
470        self.log.debug("[removedirs]: removing %s" % dir)
471        try:
472            for path, dirs, files in os.walk(dir, topdown=False):
473                for f in files:
474                    os.remove(os.path.join(path, f))
475                for d in dirs:
476                    os.rmdir(os.path.join(path, d))
477            os.rmdir(dir)
478        except EnvironmentError, e:
479            self.log.error("Error deleting directory tree in %s" % e);
480
481    @staticmethod
482    def make_temp_certfile(expcert, tmpdir):
483        """
484        make a protected copy of the access certificate so the experiment
485        controller can act as the experiment principal.  mkstemp is the most
486        secure way to do that. The directory should be created by
487        mkdtemp.  Return the filename.
488        """
489        if expcert and tmpdir:
490            try:
491                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
492                f = os.fdopen(certf, 'w')
493                print >> f, expcert
494                f.close()
495            except EnvironmentError, e:
496                raise service_error(service_error.internal, 
497                        "Cannot create temp cert file?")
498            return certfn
499        else:
500            return None
501
502       
503    def generate_ssh_keys(self, dest, type="rsa" ):
504        """
505        Generate a set of keys for the gateways to use to talk.
506
507        Keys are of type type and are stored in the required dest file.
508        """
509        valid_types = ("rsa", "dsa")
510        t = type.lower();
511        if t not in valid_types: raise ValueError
512        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
513
514        try:
515            trace = open("/dev/null", "w")
516        except EnvironmentError:
517            raise service_error(service_error.internal,
518                    "Cannot open /dev/null??");
519
520        # May raise CalledProcessError
521        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
522        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
523        if rv != 0:
524            raise service_error(service_error.internal, 
525                    "Cannot generate nonce ssh keys.  %s return code %d" \
526                            % (self.ssh_keygen, rv))
527
528    def generate_seer_certs(self, destdir):
529        '''
530        Create a SEER ca cert and a node cert in destdir/ca.pem and
531        destdir/node.pem respectively.  These will be distributed throughout
532        the federated experiment.  This routine reports errors via
533        service_errors.
534        '''
535        openssl = '/usr/bin/openssl'
536        # All the filenames and parameters we need for openssl calls below
537        ca_key =os.path.join(destdir, 'ca.key') 
538        ca_pem = os.path.join(destdir, 'ca.pem')
539        node_key =os.path.join(destdir, 'node.key') 
540        node_pem = os.path.join(destdir, 'node.pem')
541        node_req = os.path.join(destdir, 'node.req')
542        node_signed = os.path.join(destdir, 'node.signed')
543        days = '%s' % (365 * 10)
544        serial = '%s' % random.randint(0, 1<<16)
545
546        try:
547            # Sequence of calls to create a CA key, create a ca cert, create a
548            # node key, node signing request, and finally a signed node
549            # certificate.
550            sequence = (
551                    (openssl, 'genrsa', '-out', ca_key, '1024'),
552                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
553                        ca_pem, '-days', days, '-subj', 
554                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
555                    (openssl, 'genrsa', '-out', node_key, '1024'),
556                    (openssl, 'req', '-new', '-key', node_key, '-out', 
557                        node_req, '-days', days, '-subj', 
558                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
559                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
560                        '-set_serial', serial, '-req', '-in', node_req, 
561                        '-out', node_signed, '-days', days),
562                )
563            # Do all that stuff; bail if there's an error, and push all the
564            # output to dev/null.
565            for cmd in sequence:
566                trace = open("/dev/null", "w")
567                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
568                if rv != 0:
569                    raise service_error(service_error.internal, 
570                            "Cannot generate SEER certs.  %s return code %d" \
571                                    % (' '.join(cmd), rv))
572            # Concatinate the node key and signed certificate into node.pem
573            f = open(node_pem, 'w')
574            for comp in (node_signed, node_key):
575                g = open(comp, 'r')
576                f.write(g.read())
577                g.close()
578            f.close()
579
580            # Throw out intermediaries.
581            for fn in (ca_key, node_key, node_req, node_signed):
582                os.unlink(fn)
583
584        except EnvironmentError, e:
585            # Any difficulties with the file system wind up here
586            raise service_error(service_error.internal,
587                    "File error on  %s while creating SEER certs: %s" % \
588                            (e.filename, e.strerror))
589
590
591
592    def gentopo(self, str):
593        """
594        Generate the topology data structure from the splitter's XML
595        representation of it.
596
597        The topology XML looks like:
598            <experiment>
599                <nodes>
600                    <node><vname></vname><ips>ip1:ip2</ips></node>
601                </nodes>
602                <lans>
603                    <lan>
604                        <vname></vname><vnode></vnode><ip></ip>
605                        <bandwidth></bandwidth><member>node:port</member>
606                    </lan>
607                </lans>
608        """
609        class topo_parse:
610            """
611            Parse the topology XML and create the dats structure.
612            """
613            def __init__(self):
614                # Typing of the subelements for data conversion
615                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
616                self.int_subelements = ( 'bandwidth',)
617                self.float_subelements = ( 'delay',)
618                # The final data structure
619                self.nodes = [ ]
620                self.lans =  [ ]
621                self.topo = { \
622                        'node': self.nodes,\
623                        'lan' : self.lans,\
624                    }
625                self.element = { }  # Current element being created
626                self.chars = ""     # Last text seen
627
628            def end_element(self, name):
629                # After each sub element the contents is added to the current
630                # element or to the appropriate list.
631                if name == 'node':
632                    self.nodes.append(self.element)
633                    self.element = { }
634                elif name == 'lan':
635                    self.lans.append(self.element)
636                    self.element = { }
637                elif name in self.str_subelements:
638                    self.element[name] = self.chars
639                    self.chars = ""
640                elif name in self.int_subelements:
641                    self.element[name] = int(self.chars)
642                    self.chars = ""
643                elif name in self.float_subelements:
644                    self.element[name] = float(self.chars)
645                    self.chars = ""
646
647            def found_chars(self, data):
648                self.chars += data.rstrip()
649
650
651        tp = topo_parse();
652        parser = xml.parsers.expat.ParserCreate()
653        parser.EndElementHandler = tp.end_element
654        parser.CharacterDataHandler = tp.found_chars
655
656        parser.Parse(str)
657
658        return tp.topo
659       
660
661    def genviz(self, topo):
662        """
663        Generate the visualization the virtual topology
664        """
665
666        neato = "/usr/local/bin/neato"
667        # These are used to parse neato output and to create the visualization
668        # file.
669        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
670        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
671                "%s</type></node>"
672
673        try:
674            # Node names
675            nodes = [ n['vname'] for n in topo['node'] ]
676            topo_lans = topo['lan']
677        except KeyError, e:
678            raise service_error(service_error.internal, "Bad topology: %s" %e)
679
680        lans = { }
681        links = { }
682
683        # Walk through the virtual topology, organizing the connections into
684        # 2-node connections (links) and more-than-2-node connections (lans).
685        # When a lan is created, it's added to the list of nodes (there's a
686        # node in the visualization for the lan).
687        for l in topo_lans:
688            if links.has_key(l['vname']):
689                if len(links[l['vname']]) < 2:
690                    links[l['vname']].append(l['vnode'])
691                else:
692                    nodes.append(l['vname'])
693                    lans[l['vname']] = links[l['vname']]
694                    del links[l['vname']]
695                    lans[l['vname']].append(l['vnode'])
696            elif lans.has_key(l['vname']):
697                lans[l['vname']].append(l['vnode'])
698            else:
699                links[l['vname']] = [ l['vnode'] ]
700
701
702        # Open up a temporary file for dot to turn into a visualization
703        try:
704            df, dotname = tempfile.mkstemp()
705            dotfile = os.fdopen(df, 'w')
706        except EnvironmentError:
707            raise service_error(service_error.internal,
708                    "Failed to open file in genviz")
709
710        try:
711            dnull = open('/dev/null', 'w')
712        except EnvironmentError:
713            service_error(service_error.internal,
714                    "Failed to open /dev/null in genviz")
715
716        # Generate a dot/neato input file from the links, nodes and lans
717        try:
718            print >>dotfile, "graph G {"
719            for n in nodes:
720                print >>dotfile, '\t"%s"' % n
721            for l in links.keys():
722                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
723            for l in lans.keys():
724                for n in lans[l]:
725                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
726            print >>dotfile, "}"
727            dotfile.close()
728        except TypeError:
729            raise service_error(service_error.internal,
730                    "Single endpoint link in vtopo")
731        except EnvironmentError:
732            raise service_error(service_error.internal, "Cannot write dot file")
733
734        # Use dot to create a visualization
735        try:
736            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
737                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
738                stderr=dnull, close_fds=True)
739        except EnvironmentError:
740            raise service_error(service_error.internal, 
741                    "Cannot generate visualization: is graphviz available?")
742        dnull.close()
743
744        # Translate dot to vis format
745        vis_nodes = [ ]
746        vis = { 'node': vis_nodes }
747        for line in dot.stdout:
748            m = vis_re.match(line)
749            if m:
750                vn = m.group(1)
751                vis_node = {'name': vn, \
752                        'x': float(m.group(2)),\
753                        'y' : float(m.group(3)),\
754                    }
755                if vn in links.keys() or vn in lans.keys():
756                    vis_node['type'] = 'lan'
757                else:
758                    vis_node['type'] = 'node'
759                vis_nodes.append(vis_node)
760        rv = dot.wait()
761
762        os.remove(dotname)
763        # XXX: graphviz seems to use low return codes for warnings, like
764        # "couldn't find font"
765        if rv < 2 : return vis
766        else: return None
767
768
769    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
770            cert_pwd=None):
771        """
772        Release access to testbed through fedd
773        """
774
775        if not uri and tbmap:
776            uri = tbmap.get(tb, None)
777        if not uri:
778            raise service_error(service_error.server_config, 
779                    "Unknown testbed: %s" % tb)
780
781        if self.local_access.has_key(uri):
782            resp = self.local_access[uri].ReleaseAccess(\
783                    { 'ReleaseAccessRequestBody' : 
784                        {'allocID': {'fedid': aid}},}, 
785                    fedid(file=cert_file))
786            resp = { 'ReleaseAccessResponseBody': resp } 
787        else:
788            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
789                    cert_file, cert_pwd, self.trusted_certs)
790
791        # better error coding
792
793    def remote_ns2topdl(self, uri, desc):
794
795        req = {
796                'description' : { 'ns2description': desc },
797            }
798
799        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
800                self.trusted_certs)
801
802        if r.has_key('Ns2TopdlResponseBody'):
803            r = r['Ns2TopdlResponseBody']
804            ed = r.get('experimentdescription', None)
805            if ed.has_key('topdldescription'):
806                return topdl.Topology(**ed['topdldescription'])
807            else:
808                raise service_error(service_error.protocol, 
809                        "Bad splitter response (no output)")
810        else:
811            raise service_error(service_error.protocol, "Bad splitter response")
812
813    class start_segment:
814        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
815                cert_pwd=None, trusted_certs=None, caller=None,
816                log_collector=None):
817            self.log = log
818            self.debug = debug
819            self.cert_file = cert_file
820            self.cert_pwd = cert_pwd
821            self.trusted_certs = None
822            self.caller = caller
823            self.testbed = testbed
824            self.log_collector = log_collector
825            self.response = None
826            self.node = { }
827            self.proof = None
828
829        def make_map(self, resp):
830            if 'segmentdescription' not in resp  or \
831                    'topdldescription' not in resp['segmentdescription']:
832                self.log.warn('No topology returned from startsegment')
833                return 
834
835            top = topdl.Topology(
836                    **resp['segmentdescription']['topdldescription'])
837
838            for e in top.elements:
839                if isinstance(e, topdl.Computer):
840                    self.node[e.name] = e
841
842        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
843            req = {
844                    'allocID': { 'fedid' : aid }, 
845                    'segmentdescription': { 
846                        'topdldescription': topo.to_dict(),
847                    },
848                }
849
850            if connInfo:
851                req['connection'] = connInfo
852
853            import_svcs = [ s for m in masters.values() \
854                    for s in m if self.testbed in s.importers]
855
856            if import_svcs or self.testbed in masters:
857                req['service'] = []
858
859            for s in import_svcs:
860                for r in s.reqs:
861                    sr = copy.deepcopy(r)
862                    sr['visibility'] = 'import';
863                    req['service'].append(sr)
864
865            for s in masters.get(self.testbed, []):
866                for r in s.reqs:
867                    sr = copy.deepcopy(r)
868                    sr['visibility'] = 'export';
869                    req['service'].append(sr)
870
871            if attrs:
872                req['fedAttr'] = attrs
873
874            try:
875                self.log.debug("Calling StartSegment at %s " % uri)
876                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
877                        self.trusted_certs)
878                if r.has_key('StartSegmentResponseBody'):
879                    lval = r['StartSegmentResponseBody'].get('allocationLog',
880                            None)
881                    if lval and self.log_collector:
882                        for line in  lval.splitlines(True):
883                            self.log_collector.write(line)
884                    self.make_map(r['StartSegmentResponseBody'])
885                    if 'proof' in r: self.proof = r['proof']
886                    self.response = r
887                else:
888                    raise service_error(service_error.internal, 
889                            "Bad response!?: %s" %r)
890                return True
891            except service_error, e:
892                self.log.error("Start segment failed on %s: %s" % \
893                        (self.testbed, e))
894                return False
895
896
897
898    class terminate_segment:
899        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
900                cert_pwd=None, trusted_certs=None, caller=None):
901            self.log = log
902            self.debug = debug
903            self.cert_file = cert_file
904            self.cert_pwd = cert_pwd
905            self.trusted_certs = None
906            self.caller = caller
907            self.testbed = testbed
908
909        def __call__(self, uri, aid ):
910            req = {
911                    'allocID': {'fedid': aid }, 
912                }
913            self.log.info("Calling terminate segment")
914            try:
915                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
916                        self.trusted_certs)
917                self.log.info("Terminate segment succeeded")
918                return True
919            except service_error, e:
920                self.log.error("Terminate segment failed on %s: %s" % \
921                        (self.testbed, e))
922                return False
923
924    class info_segment(start_segment):
925        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
926                cert_pwd=None, trusted_certs=None, caller=None,
927                log_collector=None):
928            experiment_control_local.start_segment.__init__(self, debug, 
929                    log, testbed, cert_file, cert_pwd, trusted_certs, 
930                    caller, log_collector)
931
932        def __call__(self, uri, aid):
933            req = { 'allocID': { 'fedid' : aid } }
934
935            try:
936                self.log.debug("Calling InfoSegment at %s " % uri)
937                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
938                        self.trusted_certs)
939                if r.has_key('InfoSegmentResponseBody'):
940                    self.make_map(r['InfoSegmentResponseBody'])
941                    if 'proof' in r: self.proof = r['proof']
942                    self.response = r
943                else:
944                    raise service_error(service_error.internal, 
945                            "Bad response!?: %s" %r)
946                return True
947            except service_error, e:
948                self.log.error("Info segment failed on %s: %s" % \
949                        (self.testbed, e))
950                return False
951
952    class operation_segment:
953        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
954                cert_pwd=None, trusted_certs=None, caller=None,
955                log_collector=None):
956            self.log = log
957            self.debug = debug
958            self.cert_file = cert_file
959            self.cert_pwd = cert_pwd
960            self.trusted_certs = None
961            self.caller = caller
962            self.testbed = testbed
963            self.status = None
964
965        def __call__(self, uri, aid, op, targets, params):
966            req = { 
967                    'allocID': { 'fedid' : aid },
968                    'operation': op,
969                    'target': targets,
970                    }
971            if params: req['parameter'] = params
972
973
974            try:
975                self.log.debug("Calling OperationSegment at %s " % uri)
976                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
977                        self.trusted_certs)
978                if 'OperationSegmentResponseBody' in r:
979                    r = r['OperationSegmentResponseBody']
980                    if 'status' in r:
981                        self.status = r['status']
982                else:
983                    raise service_error(service_error.internal, 
984                            "Bad response!?: %s" %r)
985                return True
986            except service_error, e:
987                self.log.error("Operation segment failed on %s: %s" % \
988                        (self.testbed, e))
989                return False
990
991    def annotate_topology(self, top, data):
992        # These routines do various parts of the annotation
993        def add_new_names(nl, l):
994            """ add any names in nl to the list in l """
995            for n in nl:
996                if n not in l: l.append(n)
997       
998        def merge_services(ne, e):
999            for ns in ne.service:
1000                # NB: the else is on the for
1001                for s in e.service:
1002                    if ns.name == s.name:
1003                        s.importer = ns.importer
1004                        s.param = ns.param
1005                        s.description = ns.description
1006                        s.status = ns.status
1007                        break
1008                else:
1009                    e.service.append(ns)
1010       
1011        def merge_oses(ne, e):
1012            """
1013            Merge the operating system entries of ne into e
1014            """
1015            for nos in ne.os:
1016                # NB: the else is on the for
1017                for os in e.os:
1018                    if nos.name == os.name:
1019                        os.version = nos.version
1020                        os.version = nos.distribution
1021                        os.version = nos.distributionversion
1022                        for a in nos.attribute:
1023                            if os.get_attribute(a.attribute):
1024                                os.remove_attribute(a.attribute)
1025                            os.set_attribute(a.attribute, a.value)
1026                        break
1027                else:
1028                    # If both nodes have one OS, this is a replacement
1029                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
1030                    else: e.os.append(nos)
1031
1032        # Annotate the topology with embedding info
1033        for e in top.elements:
1034            if isinstance(e, topdl.Computer):
1035                for s in data:
1036                    ne = s.node.get(e.name, None)
1037                    if ne is not None:
1038                        add_new_names(ne.localname, e.localname)
1039                        e.status = ne.status
1040                        merge_services(ne, e)
1041                        add_new_names(ne.operation, e.operation)
1042                        if ne.os: merge_oses(ne, e)
1043                        break
1044
1045
1046
1047    def allocate_resources(self, allocated, masters, eid, expid, 
1048            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1049            attrs=None, connInfo={}, tbmap=None, expcert=None):
1050
1051        started = { }           # Testbeds where a sub-experiment started
1052                                # successfully
1053
1054        # XXX
1055        fail_soft = False
1056
1057        if tbmap is None: tbmap = { }
1058
1059        log = alloc_log or self.log
1060
1061        tp = thread_pool(self.nthreads)
1062        threads = [ ]
1063        starters = [ ]
1064
1065        if expcert:
1066            cert = expcert
1067            pw = None
1068        else:
1069            cert = self.cert_file
1070            pw = self.cert_pwd
1071
1072        for tb in allocated.keys():
1073            # Create and start a thread to start the segment, and save it
1074            # to get the return value later
1075            tb_attrs = copy.copy(attrs)
1076            tp.wait_for_slot()
1077            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1078            base, suffix = split_testbed(tb)
1079            if suffix:
1080                tb_attrs.append({'attribute': 'experiment_name', 
1081                    'value': "%s-%s" % (eid, suffix)})
1082            else:
1083                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1084            if not uri:
1085                raise service_error(service_error.internal, 
1086                        "Unknown testbed %s !?" % tb)
1087
1088            aid = tbparams[tb].allocID
1089            if not aid:
1090                raise service_error(service_error.internal, 
1091                        "No alloc id for testbed %s !?" % tb)
1092
1093            s = self.start_segment(log=log, debug=self.debug,
1094                    testbed=tb, cert_file=cert,
1095                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1096                    caller=self.call_StartSegment,
1097                    log_collector=log_collector)
1098            starters.append(s)
1099            t  = pooled_thread(\
1100                    target=s, name=tb,
1101                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1102                    pdata=tp, trace_file=self.trace_file)
1103            threads.append(t)
1104            t.start()
1105
1106        # Wait until all finish (keep pinging the log, though)
1107        mins = 0
1108        revoked = False
1109        while not tp.wait_for_all_done(60.0):
1110            mins += 1
1111            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1112                    % mins)
1113            if not revoked and \
1114                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1115                # a testbed has failed.  Revoke this experiment's
1116                # synchronizarion values so that sub experiments will not
1117                # deadlock waiting for synchronization that will never happen
1118                self.log.info("A subexperiment has failed to swap in, " + \
1119                        "revoking synch keys")
1120                var_key = "fedid:%s" % expid
1121                for k in self.synch_store.all_keys():
1122                    if len(k) > 45 and k[0:46] == var_key:
1123                        self.synch_store.revoke_key(k)
1124                revoked = True
1125
1126        failed = [ t.getName() for t in threads if not t.rv ]
1127        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1128
1129        # If one failed clean up, unless fail_soft is set
1130        if failed:
1131            if not fail_soft:
1132                tp.clear()
1133                for tb in succeeded:
1134                    # Create and start a thread to stop the segment
1135                    tp.wait_for_slot()
1136                    uri = tbparams[tb].uri
1137                    t  = pooled_thread(\
1138                            target=self.terminate_segment(log=log,
1139                                testbed=tb,
1140                                cert_file=cert, 
1141                                cert_pwd=pw,
1142                                trusted_certs=self.trusted_certs,
1143                                caller=self.call_TerminateSegment),
1144                            args=(uri, tbparams[tb].allocID),
1145                            name=tb,
1146                            pdata=tp, trace_file=self.trace_file)
1147                    t.start()
1148                # Wait until all finish (if any are being stopped)
1149                if succeeded:
1150                    tp.wait_for_all_done()
1151
1152                # release the allocations
1153                for tb in tbparams.keys():
1154                    try:
1155                        self.release_access(tb, tbparams[tb].allocID, 
1156                                tbmap=tbmap, uri=tbparams[tb].uri,
1157                                cert_file=cert, cert_pwd=pw)
1158                    except service_error, e:
1159                        self.log.warn("Error releasing access: %s" % e.desc)
1160                # Remove the placeholder
1161                self.state_lock.acquire()
1162                self.state[eid].status = 'failed'
1163                self.state[eid].updated()
1164                if self.state_filename: self.write_state()
1165                self.state_lock.release()
1166                # Remove the repo dir
1167                self.remove_dirs("%s/%s" %(self.repodir, expid))
1168                # Walk up tmpdir, deleting as we go
1169                if self.cleanup:
1170                    self.remove_dirs(tmpdir)
1171                else:
1172                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1173
1174
1175                log.error("Swap in failed on %s" % ",".join(failed))
1176                return
1177        else:
1178            # Walk through the successes and gather the proofs
1179            proofs = { }
1180            for s in starters:
1181                if s.proof:
1182                    proofs[s.testbed] = s.proof
1183            self.annotate_topology(top, starters)
1184            log.info("[start_segment]: Experiment %s active" % eid)
1185
1186
1187        # Walk up tmpdir, deleting as we go
1188        if self.cleanup:
1189            self.remove_dirs(tmpdir)
1190        else:
1191            log.debug("[start_experiment]: not removing %s" % tmpdir)
1192
1193        # Insert the experiment into our state and update the disk copy.
1194        self.state_lock.acquire()
1195        self.state[expid].status = 'active'
1196        self.state[eid] = self.state[expid]
1197        self.state[eid].top = top
1198        self.state[eid].updated()
1199        # Append startup proofs
1200        for f in self.state[eid].get_all_allocations():
1201            if f.tb in proofs:
1202                f.proof.append(proofs[f.tb])
1203
1204        if self.state_filename: self.write_state()
1205        self.state_lock.release()
1206        return
1207
1208
1209    def add_kit(self, e, kit):
1210        """
1211        Add a Software object created from the list of (install, location)
1212        tuples passed as kit  to the software attribute of an object e.  We
1213        do this enough to break out the code, but it's kind of a hack to
1214        avoid changing the old tuple rep.
1215        """
1216
1217        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1218
1219        if isinstance(e.software, list): e.software.extend(s)
1220        else: e.software = s
1221
1222    def append_experiment_authorization(self, expid, attrs, 
1223            need_state_lock=True):
1224        """
1225        Append the authorization information to system state
1226        """
1227
1228        for p, a in attrs:
1229            self.auth.set_attribute(p, a)
1230        self.auth.save()
1231
1232        if need_state_lock: self.state_lock.acquire()
1233        # XXX: really a no op?
1234        #self.state[expid]['auth'].update(attrs)
1235        if self.state_filename: self.write_state()
1236        if need_state_lock: self.state_lock.release()
1237
1238    def clear_experiment_authorization(self, expid, need_state_lock=True):
1239        """
1240        Attrs is a set of attribute principal pairs that need to be removed
1241        from the authenticator.  Remove them and save the authenticator.
1242        """
1243
1244        if need_state_lock: self.state_lock.acquire()
1245        # XXX: should be a no-op
1246        #if expid in self.state and 'auth' in self.state[expid]:
1247            #for p, a in self.state[expid]['auth']:
1248                #self.auth.unset_attribute(p, a)
1249            #self.state[expid]['auth'] = set()
1250        if self.state_filename: self.write_state()
1251        if need_state_lock: self.state_lock.release()
1252        self.auth.save()
1253
1254
1255    def create_experiment_state(self, fid, req, expid, expcert,
1256            state='starting'):
1257        """
1258        Create the initial entry in the experiment's state.  The expid and
1259        expcert are the experiment's fedid and certifacte that represents that
1260        ID, which are installed in the experiment state.  If the request
1261        includes a suggested local name that is used if possible.  If the local
1262        name is already taken by an experiment owned by this user that has
1263        failed, it is overwritten.  Otherwise new letters are added until a
1264        valid localname is found.  The generated local name is returned.
1265        """
1266
1267        if req.has_key('experimentID') and \
1268                req['experimentID'].has_key('localname'):
1269            overwrite = False
1270            eid = req['experimentID']['localname']
1271            # If there's an old failed experiment here with the same local name
1272            # and accessible by this user, we'll overwrite it, otherwise we'll
1273            # fall through and do the collision avoidance.
1274            old_expid = self.get_experiment_fedid(eid)
1275            if old_expid:
1276                users_experiment = True
1277                try:
1278                    self.check_experiment_access(fid, old_expid)
1279                except service_error, e:
1280                    if e.code == service_error.access: users_experiment = False
1281                    else: raise e
1282                if users_experiment:
1283                    self.state_lock.acquire()
1284                    status = self.state[eid].status
1285                    if status and status == 'failed':
1286                        # remove the old access attributes
1287                        self.clear_experiment_authorization(eid,
1288                                need_state_lock=False)
1289                        overwrite = True
1290                        del self.state[eid]
1291                        del self.state[old_expid]
1292                    self.state_lock.release()
1293                else:
1294                    self.log.info('Experiment %s exists, ' % eid + \
1295                            'but this user cannot access it')
1296            self.state_lock.acquire()
1297            while (self.state.has_key(eid) and not overwrite):
1298                eid += random.choice(string.ascii_letters)
1299            # Initial state
1300            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1301                    identity=expcert)
1302            self.state[expid] = self.state[eid]
1303            if self.state_filename: self.write_state()
1304            self.state_lock.release()
1305        else:
1306            eid = self.exp_stem
1307            for i in range(0,5):
1308                eid += random.choice(string.ascii_letters)
1309            self.state_lock.acquire()
1310            while (self.state.has_key(eid)):
1311                eid = self.exp_stem
1312                for i in range(0,5):
1313                    eid += random.choice(string.ascii_letters)
1314            # Initial state
1315            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1316                    identity=expcert)
1317            self.state[expid] = self.state[eid]
1318            if self.state_filename: self.write_state()
1319            self.state_lock.release()
1320
1321        # Let users touch the state.  Authorize this fid and the expid itself
1322        # to touch the experiment, as well as allowing th eoverrides.
1323        self.append_experiment_authorization(eid, 
1324                set([(fid, expid), (expid,expid)] + \
1325                        [ (o, expid) for o in self.overrides]))
1326
1327        return eid
1328
1329
1330    def allocate_ips_to_topo(self, top):
1331        """
1332        Add an ip4_address attribute to all the hosts in the topology, based on
1333        the shared substrates on which they sit.  An /etc/hosts file is also
1334        created and returned as a list of hostfiles entries.  We also return
1335        the allocator, because we may need to allocate IPs to portals
1336        (specifically DRAGON portals).
1337        """
1338        subs = sorted(top.substrates, 
1339                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1340                reverse=True)
1341        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1342        ifs = { }
1343        hosts = [ ]
1344
1345        for idx, s in enumerate(subs):
1346            net_size = len(s.interfaces)+2
1347
1348            a = ips.allocate(net_size)
1349            if a :
1350                base, num = a
1351                if num < net_size: 
1352                    raise service_error(service_error.internal,
1353                            "Allocator returned wrong number of IPs??")
1354            else:
1355                raise service_error(service_error.req, 
1356                        "Cannot allocate IP addresses")
1357            mask = ips.min_alloc
1358            while mask < net_size:
1359                mask *= 2
1360
1361            netmask = ((2**32-1) ^ (mask-1))
1362
1363            base += 1
1364            for i in s.interfaces:
1365                i.attribute.append(
1366                        topdl.Attribute('ip4_address', 
1367                            "%s" % ip_addr(base)))
1368                i.attribute.append(
1369                        topdl.Attribute('ip4_netmask', 
1370                            "%s" % ip_addr(int(netmask))))
1371
1372                hname = i.element.name
1373                if ifs.has_key(hname):
1374                    hosts.append("%s\t%s-%s %s-%d" % \
1375                            (ip_addr(base), hname, s.name, hname,
1376                                ifs[hname]))
1377                else:
1378                    ifs[hname] = 0
1379                    hosts.append("%s\t%s-%s %s-%d %s" % \
1380                            (ip_addr(base), hname, s.name, hname,
1381                                ifs[hname], hname))
1382
1383                ifs[hname] += 1
1384                base += 1
1385        return hosts, ips
1386
1387    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1388            tbparam, masters, tbmap, expid=None, expcert=None):
1389        for tb in testbeds:
1390            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1391                    expcert)
1392            allocated[tb] = 1
1393
1394    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1395            expcert=None):
1396        """
1397        Get access to testbed through fedd and set the parameters for that tb
1398        """
1399        def get_export_project(svcs):
1400            """
1401            Look through for the list of federated_service for this testbed
1402            objects for a project_export service, and extract the project
1403            parameter.
1404            """
1405
1406            pe = [s for s in svcs if s.name=='project_export']
1407            if len(pe) == 1:
1408                return pe[0].params.get('project', None)
1409            elif len(pe) == 0:
1410                return None
1411            else:
1412                raise service_error(service_error.req,
1413                        "More than one project export is not supported")
1414
1415        def add_services(svcs, type, slist, keys):
1416            """
1417            Add the given services to slist.  type is import or export.  Also
1418            add a mapping entry from the assigned id to the original service
1419            record.
1420            """
1421            for i, s in enumerate(svcs):
1422                idx = '%s%d' % (type, i)
1423                keys[idx] = s
1424                sr = {'id': idx, 'name': s.name, 'visibility': type }
1425                if s.params:
1426                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1427                            for k, v in s.params.items()]
1428                slist.append(sr)
1429
1430        uri = tbmap.get(testbed_base(tb), None)
1431        if not uri:
1432            raise service_error(service_error.server_config, 
1433                    "Unknown testbed: %s" % tb)
1434
1435        export_svcs = masters.get(tb,[])
1436        import_svcs = [ s for m in masters.values() \
1437                for s in m \
1438                    if tb in s.importers ]
1439
1440        export_project = get_export_project(export_svcs)
1441        # Compose the credential list so that IDs come before attributes
1442        creds = set()
1443        keys = set()
1444        certs = self.auth.get_creds_for_principal(fid)
1445        if expid:
1446            certs.update(self.auth.get_creds_for_principal(expid))
1447        for c in certs:
1448            keys.add(c.issuer_cert())
1449            creds.add(c.attribute_cert())
1450        creds = list(keys) + list(creds)
1451
1452        if expcert: cert, pw = expcert, None
1453        else: cert, pw = self.cert_file, self.cert_pw
1454
1455        # Request credentials
1456        req = {
1457                'abac_credential': creds,
1458            }
1459        # Make the service request from the services we're importing and
1460        # exporting.  Keep track of the export request ids so we can
1461        # collect the resulting info from the access response.
1462        e_keys = { }
1463        if import_svcs or export_svcs:
1464            slist = []
1465            add_services(import_svcs, 'import', slist, e_keys)
1466            add_services(export_svcs, 'export', slist, e_keys)
1467            req['service'] = slist
1468
1469        if self.local_access.has_key(uri):
1470            # Local access call
1471            req = { 'RequestAccessRequestBody' : req }
1472            r = self.local_access[uri].RequestAccess(req, 
1473                    fedid(file=self.cert_file))
1474            r = { 'RequestAccessResponseBody' : r }
1475        else:
1476            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1477
1478        if r.has_key('RequestAccessResponseBody'):
1479            # Through to here we have a valid response, not a fault.
1480            # Access denied is a fault, so something better or worse than
1481            # access denied has happened.
1482            r = r['RequestAccessResponseBody']
1483            self.log.debug("[get_access] Access granted")
1484        else:
1485            raise service_error(service_error.protocol,
1486                        "Bad proxy response")
1487        if 'proof' not in r:
1488            raise service_error(service_error.protocol,
1489                        "Bad access response (no access proof)")
1490
1491        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1492                tb=tb, uri=uri, proof=[r['proof']], 
1493                services=masters.get(tb, None))
1494
1495        # Collect the responses corresponding to the services this testbed
1496        # exports.  These will be the service requests that we will include in
1497        # the start segment requests (with appropriate visibility values) to
1498        # import and export the segments.
1499        for s in r.get('service', []):
1500            id = s.get('id', None)
1501            # Note that this attaches the response to the object in the masters
1502            # data structure.  (The e_keys index disappears when this fcn
1503            # returns)
1504            if id and id in e_keys:
1505                e_keys[id].reqs.append(s)
1506
1507        # Add attributes to parameter space.  We don't allow attributes to
1508        # overlay any parameters already installed.
1509        for a in r.get('fedAttr', []):
1510            try:
1511                if a['attribute']:
1512                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1513            except KeyError:
1514                self.log.error("Bad attribute in response: %s" % a)
1515
1516
1517    def split_topology(self, top, topo, testbeds):
1518        """
1519        Create the sub-topologies that are needed for experiment instantiation.
1520        """
1521        for tb in testbeds:
1522            topo[tb] = top.clone()
1523            # copy in for loop allows deletions from the original
1524            for e in [ e for e in topo[tb].elements]:
1525                etb = e.get_attribute('testbed')
1526                # NB: elements without a testbed attribute won't appear in any
1527                # sub topologies. 
1528                if not etb or etb != tb:
1529                    for i in e.interface:
1530                        for s in i.subs:
1531                            try:
1532                                s.interfaces.remove(i)
1533                            except ValueError:
1534                                raise service_error(service_error.internal,
1535                                        "Can't remove interface??")
1536                    topo[tb].elements.remove(e)
1537            topo[tb].make_indices()
1538
1539    def confirm_software(self, top):
1540        """
1541        Make sure that the software to be loaded in the topo is all available
1542        before we begin making access requests, etc.  This is a subset of
1543        wrangle_software.
1544        """
1545        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1546        pkgs.update([x.location for e in top.elements for x in e.software])
1547
1548        for pkg in pkgs:
1549            loc = pkg
1550
1551            scheme, host, path = urlparse(loc)[0:3]
1552            dest = os.path.basename(path)
1553            if not scheme:
1554                if not loc.startswith('/'):
1555                    loc = "/%s" % loc
1556                loc = "file://%s" %loc
1557            # NB: if scheme was found, loc == pkg
1558            try:
1559                u = urlopen(loc)
1560                u.close()
1561            except Exception, e:
1562                raise service_error(service_error.req, 
1563                        "Cannot open %s: %s" % (loc, e))
1564        return True
1565
1566    def wrangle_software(self, expid, top, topo, tbparams):
1567        """
1568        Copy software out to the repository directory, allocate permissions and
1569        rewrite the segment topologies to look for the software in local
1570        places.
1571        """
1572
1573        # Copy the rpms and tarfiles to a distribution directory from
1574        # which the federants can retrieve them
1575        linkpath = "%s/software" %  expid
1576        softdir ="%s/%s" % ( self.repodir, linkpath)
1577        softmap = { }
1578
1579        # self.fedkit and self.gateway kit are lists of tuples of
1580        # (install_location, download_location) this extracts the download
1581        # locations.
1582        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1583        pkgs.update([x.location for e in top.elements for x in e.software])
1584        try:
1585            os.makedirs(softdir)
1586        except EnvironmentError, e:
1587            raise service_error(
1588                    "Cannot create software directory: %s" % e)
1589        # The actual copying.  Everything's converted into a url for copying.
1590        auth_attrs = set()
1591        for pkg in pkgs:
1592            loc = pkg
1593
1594            scheme, host, path = urlparse(loc)[0:3]
1595            dest = os.path.basename(path)
1596            if not scheme:
1597                if not loc.startswith('/'):
1598                    loc = "/%s" % loc
1599                loc = "file://%s" %loc
1600            # NB: if scheme was found, loc == pkg
1601            try:
1602                u = urlopen(loc)
1603            except Exception, e:
1604                raise service_error(service_error.req, 
1605                        "Cannot open %s: %s" % (loc, e))
1606            try:
1607                f = open("%s/%s" % (softdir, dest) , "w")
1608                self.log.debug("Writing %s/%s" % (softdir,dest) )
1609                data = u.read(4096)
1610                while data:
1611                    f.write(data)
1612                    data = u.read(4096)
1613                f.close()
1614                u.close()
1615            except Exception, e:
1616                raise service_error(service_error.internal,
1617                        "Could not copy %s: %s" % (loc, e))
1618            path = re.sub("/tmp", "", linkpath)
1619            # XXX
1620            softmap[pkg] = \
1621                    "%s/%s/%s" %\
1622                    ( self.repo_url, path, dest)
1623
1624            # Allow the individual segments to access the software by assigning
1625            # an attribute to each testbed allocation that encodes the data to
1626            # be released.  This expression collects the data for each run of
1627            # the loop.
1628            auth_attrs.update([
1629                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1630                        for tb in tbparams.keys()])
1631
1632        self.append_experiment_authorization(expid, auth_attrs)
1633
1634        # Convert the software locations in the segments into the local
1635        # copies on this host
1636        for soft in [ s for tb in topo.values() \
1637                for e in tb.elements \
1638                    if getattr(e, 'software', False) \
1639                        for s in e.software ]:
1640            if softmap.has_key(soft.location):
1641                soft.location = softmap[soft.location]
1642
1643
1644    def new_experiment(self, req, fid):
1645        """
1646        The external interface to empty initial experiment creation called from
1647        the dispatcher.
1648
1649        Creates a working directory, splits the incoming description using the
1650        splitter script and parses out the avrious subsections using the
1651        lcasses above.  Once each sub-experiment is created, use pooled threads
1652        to instantiate them and start it all up.
1653        """
1654        req = req.get('NewRequestBody', None)
1655        if not req:
1656            raise service_error(service_error.req,
1657                    "Bad request format (no NewRequestBody)")
1658
1659        if self.auth.import_credentials(data_list=req.get('credential', [])):
1660            self.auth.save()
1661
1662        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1663                with_proof=True)
1664
1665        if not access_ok:
1666            raise service_error(service_error.access, "New access denied",
1667                    proof=[proof])
1668
1669        try:
1670            tmpdir = tempfile.mkdtemp(prefix="split-")
1671        except EnvironmentError:
1672            raise service_error(service_error.internal, "Cannot create tmp dir")
1673
1674        try:
1675            access_user = self.accessdb[fid]
1676        except KeyError:
1677            raise service_error(service_error.internal,
1678                    "Access map and authorizer out of sync in " + \
1679                            "new_experiment for fedid %s"  % fid)
1680
1681        # Generate an ID for the experiment (slice) and a certificate that the
1682        # allocator can use to prove they own it.  We'll ship it back through
1683        # the encrypted connection.  If the requester supplied one, use it.
1684        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1685            expcert = req['experimentAccess']['X509']
1686            expid = fedid(certstr=expcert)
1687            self.state_lock.acquire()
1688            if expid in self.state:
1689                self.state_lock.release()
1690                raise service_error(service_error.req, 
1691                        'fedid %s identifies an existing experiment' % expid)
1692            self.state_lock.release()
1693        else:
1694            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1695
1696        #now we're done with the tmpdir, and it should be empty
1697        if self.cleanup:
1698            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1699            os.rmdir(tmpdir)
1700        else:
1701            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1702
1703        eid = self.create_experiment_state(fid, req, expid, expcert, 
1704                state='empty')
1705
1706        rv = {
1707                'experimentID': [
1708                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1709                ],
1710                'experimentStatus': 'empty',
1711                'experimentAccess': { 'X509' : expcert },
1712                'proof': proof.to_dict(),
1713            }
1714
1715        return rv
1716
1717    # create_experiment sub-functions
1718
1719    @staticmethod
1720    def get_experiment_key(req, field='experimentID'):
1721        """
1722        Parse the experiment identifiers out of the request (the request body
1723        tag has been removed).  Specifically this pulls either the fedid or the
1724        localname out of the experimentID field.  A fedid is preferred.  If
1725        neither is present or the request does not contain the fields,
1726        service_errors are raised.
1727        """
1728        # Get the experiment access
1729        exp = req.get(field, None)
1730        if exp:
1731            if exp.has_key('fedid'):
1732                key = exp['fedid']
1733            elif exp.has_key('localname'):
1734                key = exp['localname']
1735            else:
1736                raise service_error(service_error.req, "Unknown lookup type")
1737        else:
1738            raise service_error(service_error.req, "No request?")
1739
1740        return key
1741
1742    def get_experiment_ids_and_start(self, key, tmpdir):
1743        """
1744        Get the experiment name, id and access certificate from the state, and
1745        set the experiment state to 'starting'.  returns a triple (fedid,
1746        localname, access_cert_file). The access_cert_file is a copy of the
1747        contents of the access certificate, created in the tempdir with
1748        restricted permissions.  If things are confused, raise an exception.
1749        """
1750
1751        expid = eid = None
1752        self.state_lock.acquire()
1753        if key in self.state:
1754            exp = self.state[key]
1755            exp.status = "starting"
1756            exp.updated()
1757            expid = exp.fedid
1758            eid = exp.localname
1759            expcert = exp.identity
1760        self.state_lock.release()
1761
1762        # make a protected copy of the access certificate so the experiment
1763        # controller can act as the experiment principal.
1764        if expcert:
1765            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1766            if not expcert_file:
1767                raise service_error(service_error.internal, 
1768                        "Cannot create temp cert file?")
1769        else:
1770            expcert_file = None
1771
1772        return (eid, expid, expcert_file)
1773
1774    def get_topology(self, req, tmpdir):
1775        """
1776        Get the ns2 content and put it into a file for parsing.  Call the local
1777        or remote parser and return the topdl.Topology.  Errors result in
1778        exceptions.  req is the request and tmpdir is a work directory.
1779        """
1780
1781        # The tcl parser needs to read a file so put the content into that file
1782        descr=req.get('experimentdescription', None)
1783        if descr:
1784            if 'ns2description' in descr:
1785                file_content=descr['ns2description']
1786            elif 'topdldescription' in descr:
1787                return topdl.Topology(**descr['topdldescription'])
1788            else:
1789                raise service_error(service_error.req, 
1790                        'Unknown experiment description type')
1791        else:
1792            raise service_error(service_error.req, "No experiment description")
1793
1794
1795        if self.splitter_url:
1796            self.log.debug("Calling remote topdl translator at %s" % \
1797                    self.splitter_url)
1798            top = self.remote_ns2topdl(self.splitter_url, file_content)
1799        else:
1800            tclfile = os.path.join(tmpdir, "experiment.tcl")
1801            if file_content:
1802                try:
1803                    f = open(tclfile, 'w')
1804                    f.write(file_content)
1805                    f.close()
1806                except EnvironmentError:
1807                    raise service_error(service_error.internal,
1808                            "Cannot write temp experiment description")
1809            else:
1810                raise service_error(service_error.req, 
1811                        "Only ns2descriptions supported")
1812            pid = "dummy"
1813            gid = "dummy"
1814            eid = "dummy"
1815
1816            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1817                str(self.muxmax), '-m', 'dummy']
1818
1819            tclcmd.extend([pid, gid, eid, tclfile])
1820
1821            self.log.debug("running local splitter %s", " ".join(tclcmd))
1822            # This is just fantastic.  As a side effect the parser copies
1823            # tb_compat.tcl into the current directory, so that directory
1824            # must be writable by the fedd user.  Doing this in the
1825            # temporary subdir ensures this is the case.
1826            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1827                    cwd=tmpdir)
1828            split_data = tclparser.stdout
1829
1830            top = topdl.topology_from_xml(file=split_data, top="experiment")
1831            os.remove(tclfile)
1832
1833        return top
1834
1835    def get_testbed_services(self, req, testbeds):
1836        """
1837        Parse the services section of the request into two dicts mapping
1838        testbed to lists of federated_service objects.  The first dict maps all
1839        exporters of services to those service objects, the second maps
1840        testbeds to service objects only for services requiring portals.
1841        """
1842        # We construct both dicts here because deriving the second is more
1843        # comples than it looks - both the keys and lists can differ, and it's
1844        # much easier to generate both in one pass.
1845        masters = { }
1846        pmasters = { }
1847        for s in req.get('service', []):
1848            # If this is a service request with the importall field
1849            # set, fill it out.
1850
1851            if s.get('importall', False):
1852                s['import'] = [ tb for tb in testbeds \
1853                        if tb not in s.get('export',[])]
1854                del s['importall']
1855
1856            # Add the service to masters
1857            for tb in s.get('export', []):
1858                if s.get('name', None):
1859
1860                    params = { }
1861                    for a in s.get('fedAttr', []):
1862                        params[a.get('attribute', '')] = a.get('value','')
1863
1864                    fser = federated_service(name=s['name'],
1865                            exporter=tb, importers=s.get('import',[]),
1866                            params=params)
1867                    if fser.name == 'hide_hosts' \
1868                            and 'hosts' not in fser.params:
1869                        fser.params['hosts'] = \
1870                                ",".join(tb_hosts.get(fser.exporter, []))
1871                    if tb in masters: masters[tb].append(fser)
1872                    else: masters[tb] = [fser]
1873
1874                    if fser.portal:
1875                        if tb in pmasters: pmasters[tb].append(fser)
1876                        else: pmasters[tb] = [fser]
1877                else:
1878                    self.log.error('Testbed service does not have name " + \
1879                            "and importers')
1880        return masters, pmasters
1881
1882    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1883        """
1884        Create the ssh keys necessary for interconnecting the portal nodes and
1885        the global hosts file for letting each segment know about the IP
1886        addresses in play.  Save these into the repo.  Add attributes to the
1887        autorizer allowing access controllers to download them and return a set
1888        of attributes that inform the segments where to find this stuff.  May
1889        raise service_errors in if there are problems.
1890        """
1891        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1892        gw_secretkey_base = "fed.%s" % self.ssh_type
1893        keydir = os.path.join(tmpdir, 'keys')
1894        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1895        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1896
1897        try:
1898            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1899        except ValueError:
1900            raise service_error(service_error.server_config, 
1901                    "Bad key type (%s)" % self.ssh_type)
1902
1903        self.generate_seer_certs(keydir)
1904
1905        # Copy configuration files into the remote file store
1906        # The config urlpath
1907        configpath = "/%s/config" % expid
1908        # The config file system location
1909        configdir ="%s%s" % ( self.repodir, configpath)
1910        try:
1911            os.makedirs(configdir)
1912        except EnvironmentError, e:
1913            raise service_error(service_error.internal,
1914                    "Cannot create config directory: %s" % e)
1915        try:
1916            f = open("%s/hosts" % configdir, "w")
1917            print >> f, string.join(hosts, '\n')
1918            f.close()
1919        except EnvironmentError, e:
1920            raise service_error(service_error.internal, 
1921                    "Cannot write hosts file: %s" % e)
1922        try:
1923            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1924            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1925            copy_file(os.path.join(keydir, 'ca.pem'), 
1926                    os.path.join(configdir, 'ca.pem'))
1927            copy_file(os.path.join(keydir, 'node.pem'), 
1928                    os.path.join(configdir, 'node.pem'))
1929        except EnvironmentError, e:
1930            raise service_error(service_error.internal, 
1931                    "Cannot copy keyfiles: %s" % e)
1932
1933        # Allow the individual testbeds to access the configuration files,
1934        # again by setting an attribute for the relevant pathnames on each
1935        # allocation principal.  Yeah, that's a long list comprehension.
1936        self.append_experiment_authorization(expid, set([
1937            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1938                    for tb in tbparams.keys() \
1939                        for f in ("hosts", 'ca.pem', 'node.pem', 
1940                            gw_secretkey_base, gw_pubkey_base)]))
1941
1942        attrs = [ 
1943                {
1944                    'attribute': 'ssh_pubkey', 
1945                    'value': '%s/%s/config/%s' % \
1946                            (self.repo_url, expid, gw_pubkey_base)
1947                },
1948                {
1949                    'attribute': 'ssh_secretkey', 
1950                    'value': '%s/%s/config/%s' % \
1951                            (self.repo_url, expid, gw_secretkey_base)
1952                },
1953                {
1954                    'attribute': 'hosts', 
1955                    'value': '%s/%s/config/hosts' % \
1956                            (self.repo_url, expid)
1957                },
1958                {
1959                    'attribute': 'seer_ca_pem', 
1960                    'value': '%s/%s/config/%s' % \
1961                            (self.repo_url, expid, 'ca.pem')
1962                },
1963                {
1964                    'attribute': 'seer_node_pem', 
1965                    'value': '%s/%s/config/%s' % \
1966                            (self.repo_url, expid, 'node.pem')
1967                },
1968            ]
1969        return attrs
1970
1971
1972    def get_vtopo(self, req, fid):
1973        """
1974        Return the stored virtual topology for this experiment
1975        """
1976        rv = None
1977        state = None
1978
1979        req = req.get('VtopoRequestBody', None)
1980        if not req:
1981            raise service_error(service_error.req,
1982                    "Bad request format (no VtopoRequestBody)")
1983        exp = req.get('experiment', None)
1984        if exp:
1985            if exp.has_key('fedid'):
1986                key = exp['fedid']
1987                keytype = "fedid"
1988            elif exp.has_key('localname'):
1989                key = exp['localname']
1990                keytype = "localname"
1991            else:
1992                raise service_error(service_error.req, "Unknown lookup type")
1993        else:
1994            raise service_error(service_error.req, "No request?")
1995
1996        proof = self.check_experiment_access(fid, key)
1997
1998        self.state_lock.acquire()
1999        # XXX: this needs to be recalculated
2000        if key in self.state:
2001            if self.state[key].top is not None:
2002                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2003                rv = { 'experiment' : {keytype: key },
2004                        'vtopo': vtopo,
2005                        'proof': proof.to_dict(), 
2006                    }
2007            else:
2008                state = self.state[key].status
2009        self.state_lock.release()
2010
2011        if rv: return rv
2012        else: 
2013            if state:
2014                raise service_error(service_error.partial, 
2015                        "Not ready: %s" % state)
2016            else:
2017                raise service_error(service_error.req, "No such experiment")
2018
2019    def get_vis(self, req, fid):
2020        """
2021        Return the stored visualization for this experiment
2022        """
2023        rv = None
2024        state = None
2025
2026        req = req.get('VisRequestBody', None)
2027        if not req:
2028            raise service_error(service_error.req,
2029                    "Bad request format (no VisRequestBody)")
2030        exp = req.get('experiment', None)
2031        if exp:
2032            if exp.has_key('fedid'):
2033                key = exp['fedid']
2034                keytype = "fedid"
2035            elif exp.has_key('localname'):
2036                key = exp['localname']
2037                keytype = "localname"
2038            else:
2039                raise service_error(service_error.req, "Unknown lookup type")
2040        else:
2041            raise service_error(service_error.req, "No request?")
2042
2043        proof = self.check_experiment_access(fid, key)
2044
2045        self.state_lock.acquire()
2046        # Generate the visualization
2047        if key in self.state:
2048            if self.state[key].top is not None:
2049                try:
2050                    vis = self.genviz(
2051                            topdl.topology_to_vtopo(self.state[key].top))
2052                except service_error, e:
2053                    self.state_lock.release()
2054                    raise e
2055                rv =  { 'experiment' : {keytype: key },
2056                        'vis': vis,
2057                        'proof': proof.to_dict(), 
2058                        }
2059            else:
2060                state = self.state[key].status
2061        self.state_lock.release()
2062
2063        if rv: return rv
2064        else:
2065            if state:
2066                raise service_error(service_error.partial, 
2067                        "Not ready: %s" % state)
2068            else:
2069                raise service_error(service_error.req, "No such experiment")
2070
2071   
2072    def save_federant_information(self, allocated, tbparams, eid, top):
2073        """
2074        Store the various data that have changed in the experiment state
2075        between when it was started and the beginning of resource allocation.
2076        This is basically the information about each local allocation.  This
2077        fills in the values of the placeholder allocation in the state.  It
2078        also collects the access proofs and returns them as dicts for a
2079        response message.
2080        """
2081        self.state_lock.acquire()
2082        exp = self.state[eid]
2083        exp.top = top.clone()
2084        # save federant information
2085        for k in allocated.keys():
2086            exp.add_allocation(tbparams[k])
2087            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2088                type="testbed", localname=[k], 
2089                service=[ s.to_topdl() for s in tbparams[k].services]))
2090
2091        # Access proofs for the response message
2092        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2093                    for p in tbparams[k].proof]
2094        exp.updated()
2095        if self.state_filename: 
2096            self.write_state()
2097        self.state_lock.release()
2098        return proofs
2099
2100    def clear_placeholder(self, eid, expid, tmpdir):
2101        """
2102        Clear the placeholder and remove any allocated temporary dir.
2103        """
2104
2105        self.state_lock.acquire()
2106        del self.state[eid]
2107        del self.state[expid]
2108        if self.state_filename: self.write_state()
2109        self.state_lock.release()
2110        if tmpdir and self.cleanup:
2111            self.remove_dirs(tmpdir)
2112
2113    # end of create_experiment sub-functions
2114
2115    def create_experiment(self, req, fid):
2116        """
2117        The external interface to experiment creation called from the
2118        dispatcher.
2119
2120        Creates a working directory, splits the incoming description using the
2121        splitter script and parses out the various subsections using the
2122        classes above.  Once each sub-experiment is created, use pooled threads
2123        to instantiate them and start it all up.
2124        """
2125
2126        req = req.get('CreateRequestBody', None)
2127        if req:
2128            key = self.get_experiment_key(req)
2129        else:
2130            raise service_error(service_error.req,
2131                    "Bad request format (no CreateRequestBody)")
2132
2133        # Import information from the requester
2134        if self.auth.import_credentials(data_list=req.get('credential', [])):
2135            self.auth.save()
2136
2137        # Make sure that the caller can talk to us
2138        proof = self.check_experiment_access(fid, key)
2139
2140        # Install the testbed map entries supplied with the request into a copy
2141        # of the testbed map.
2142        tbmap = dict(self.tbmap)
2143        tbactive = set(self.tbactive)
2144        for m in req.get('testbedmap', []):
2145            if 'testbed' in m and 'uri' in m:
2146                tbmap[m['testbed']] = m['uri']
2147                if 'active' in m and m['active']: tbactive.add(m['testbed'])
2148
2149        # a place to work
2150        try:
2151            tmpdir = tempfile.mkdtemp(prefix="split-")
2152            os.mkdir(tmpdir+"/keys")
2153        except EnvironmentError:
2154            raise service_error(service_error.internal, "Cannot create tmp dir")
2155
2156        tbparams = { }
2157
2158        eid, expid, expcert_file = \
2159                self.get_experiment_ids_and_start(key, tmpdir)
2160
2161        # This catches exceptions to clear the placeholder if necessary
2162        try: 
2163            if not (eid and expid):
2164                raise service_error(service_error.internal, 
2165                        "Cannot find local experiment info!?")
2166
2167            top = self.get_topology(req, tmpdir)
2168            self.confirm_software(top)
2169            # Assign the IPs
2170            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2171            # Find the testbeds to look up
2172            tb_hosts = { }
2173            testbeds = [ ]
2174            for e in top.elements:
2175                if isinstance(e, topdl.Computer):
2176                    tb = e.get_attribute('testbed') or 'default'
2177                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2178                    else: 
2179                        tb_hosts[tb] = [ e.name ]
2180                        testbeds.append(tb)
2181
2182            masters, pmasters = self.get_testbed_services(req, testbeds)
2183            allocated = { }         # Testbeds we can access
2184            topo ={ }               # Sub topologies
2185            connInfo = { }          # Connection information
2186
2187            self.split_topology(top, topo, testbeds)
2188
2189            self.get_access_to_testbeds(testbeds, fid, allocated, 
2190                    tbparams, masters, tbmap, expid, expcert_file)
2191
2192            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2193
2194            part = experiment_partition(self.auth, self.store_url, tbmap,
2195                    self.muxmax, self.direct_transit, tbactive)
2196            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2197                    connInfo, expid)
2198
2199            auth_attrs = set()
2200            # Now get access to the dynamic testbeds (those added above)
2201            for tb in [ t for t in topo if t not in allocated]:
2202                self.get_access(tb, tbparams, fid, masters, tbmap, 
2203                        expid, expcert_file)
2204                allocated[tb] = 1
2205                store_keys = topo[tb].get_attribute('store_keys')
2206                # Give the testbed access to keys it exports or imports
2207                if store_keys:
2208                    auth_attrs.update(set([
2209                        (tbparams[tb].allocID, sk) \
2210                                for sk in store_keys.split(" ")]))
2211
2212            if auth_attrs:
2213                self.append_experiment_authorization(expid, auth_attrs)
2214
2215            # transit and disconnected testbeds may not have a connInfo entry.
2216            # Fill in the blanks.
2217            for t in allocated.keys():
2218                if not connInfo.has_key(t):
2219                    connInfo[t] = { }
2220
2221            self.wrangle_software(expid, top, topo, tbparams)
2222
2223            proofs = self.save_federant_information(allocated, tbparams, 
2224                    eid, top)
2225        except service_error, e:
2226            # If something goes wrong in the parse (usually an access error)
2227            # clear the placeholder state.  From here on out the code delays
2228            # exceptions.  Failing at this point returns a fault to the remote
2229            # caller.
2230            self.clear_placeholder(eid, expid, tmpdir)
2231            raise e
2232
2233        # Start the background swapper and return the starting state.  From
2234        # here on out, the state will stick around a while.
2235
2236        # Create a logger that logs to the experiment's state object as well as
2237        # to the main log file.
2238        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2239        alloc_collector = self.list_log(self.state[eid].log)
2240        h = logging.StreamHandler(alloc_collector)
2241        # XXX: there should be a global one of these rather than repeating the
2242        # code.
2243        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2244                    '%d %b %y %H:%M:%S'))
2245        alloc_log.addHandler(h)
2246
2247        # Start a thread to do the resource allocation
2248        t  = Thread(target=self.allocate_resources,
2249                args=(allocated, masters, eid, expid, tbparams, 
2250                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2251                    connInfo, tbmap, expcert_file),
2252                name=eid)
2253        t.start()
2254
2255        rv = {
2256                'experimentID': [
2257                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2258                ],
2259                'experimentStatus': 'starting',
2260                'proof': [ proof.to_dict() ] + proofs,
2261            }
2262
2263        return rv
2264   
2265    def get_experiment_fedid(self, key):
2266        """
2267        find the fedid associated with the localname key in the state database.
2268        """
2269
2270        rv = None
2271        self.state_lock.acquire()
2272        if key in self.state:
2273            rv = self.state[key].fedid
2274        self.state_lock.release()
2275        return rv
2276
2277    def check_experiment_access(self, fid, key):
2278        """
2279        Confirm that the fid has access to the experiment.  Though a request
2280        may be made in terms of a local name, the access attribute is always
2281        the experiment's fedid.
2282        """
2283        if not isinstance(key, fedid):
2284            key = self.get_experiment_fedid(key)
2285
2286        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2287
2288        if access_ok:
2289            return proof
2290        else:
2291            raise service_error(service_error.access, "Access Denied",
2292                proof)
2293
2294
2295    def get_handler(self, path, fid):
2296        """
2297        Perhaps surprisingly named, this function handles HTTP GET requests to
2298        this server (SOAP requests are POSTs).
2299        """
2300        self.log.info("Get handler %s %s" % (path, fid))
2301        # XXX: log proofs?
2302        if self.auth.check_attribute(fid, path):
2303            return ("%s/%s" % (self.repodir, path), "application/binary")
2304        else:
2305            return (None, None)
2306
2307    def update_info(self, key, force=False):
2308        top = None
2309        self.state_lock.acquire()
2310        if key in self.state:
2311            if force or self.state[key].older_than(self.info_cache_limit):
2312                top = self.state[key].top
2313                if top is not None: top = top.clone()
2314                d1, info_params, cert, d2 = \
2315                        self.get_segment_info(self.state[key], need_lock=False)
2316        self.state_lock.release()
2317
2318        if top is None: return
2319
2320        try:
2321            tmpdir = tempfile.mkdtemp(prefix="info-")
2322        except EnvironmentError:
2323            raise service_error(service_error.internal, 
2324                    "Cannot create tmp dir")
2325        cert_file = self.make_temp_certfile(cert, tmpdir)
2326
2327        data = []
2328        try:
2329            for k, (uri, aid) in info_params.items():
2330                info=self.info_segment(log=self.log, testbed=uri,
2331                            cert_file=cert_file, cert_pwd=None,
2332                            trusted_certs=self.trusted_certs,
2333                            caller=self.call_InfoSegment)
2334                info(uri, aid)
2335                data.append(info)
2336        # Clean up the tmpdir no matter what
2337        finally:
2338            if tmpdir: self.remove_dirs(tmpdir)
2339
2340        self.annotate_topology(top, data)
2341        self.state_lock.acquire()
2342        if key in self.state:
2343            self.state[key].top = top
2344            self.state[key].updated()
2345            if self.state_filename: self.write_state()
2346        self.state_lock.release()
2347
2348   
2349    def get_info(self, req, fid):
2350        """
2351        Return all the stored info about this experiment
2352        """
2353        rv = None
2354
2355        req = req.get('InfoRequestBody', None)
2356        if not req:
2357            raise service_error(service_error.req,
2358                    "Bad request format (no InfoRequestBody)")
2359        exp = req.get('experiment', None)
2360        legacy = req.get('legacy', False)
2361        fresh = req.get('fresh', False)
2362        if exp:
2363            if exp.has_key('fedid'):
2364                key = exp['fedid']
2365                keytype = "fedid"
2366            elif exp.has_key('localname'):
2367                key = exp['localname']
2368                keytype = "localname"
2369            else:
2370                raise service_error(service_error.req, "Unknown lookup type")
2371        else:
2372            raise service_error(service_error.req, "No request?")
2373
2374        proof = self.check_experiment_access(fid, key)
2375
2376        self.update_info(key, fresh)
2377
2378        self.state_lock.acquire()
2379        if self.state.has_key(key):
2380            rv = self.state[key].get_info()
2381            # Copy the topo if we need legacy annotations
2382            if legacy:
2383                top = self.state[key].top
2384                if top is not None: top = top.clone()
2385        self.state_lock.release()
2386
2387        # If the legacy visualization and topology representations are
2388        # requested, calculate them and add them to the return.
2389        if legacy and rv is not None:
2390            if top is not None:
2391                vtopo = topdl.topology_to_vtopo(top)
2392                if vtopo is not None:
2393                    rv['vtopo'] = vtopo
2394                    try:
2395                        vis = self.genviz(vtopo)
2396                    except service_error, e:
2397                        self.log.debug('Problem generating visualization: %s' \
2398                                % e)
2399                        vis = None
2400                    if vis is not None:
2401                        rv['vis'] = vis
2402        if rv:
2403            rv['proof'] = proof.to_dict()
2404            return rv
2405        else:
2406            raise service_error(service_error.req, "No such experiment")
2407
2408    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2409            results):
2410        """
2411        Call OperateSegment on multiple testbeds and gather the results.
2412        op_params contains the parameters needed to contact that testbed, cert
2413        is a certificate containing the fedid to use, op is the operation,
2414        testbeds is a dict mapping testbed name to targets in that testbed,
2415        params are the parameters to include a,d results is a growing list of
2416        the results of the calls.
2417        """
2418        try:
2419            tmpdir = tempfile.mkdtemp(prefix="info-")
2420        except EnvironmentError:
2421            raise service_error(service_error.internal, 
2422                    "Cannot create tmp dir")
2423        cert_file = self.make_temp_certfile(cert, tmpdir)
2424
2425        try:
2426            for tb, targets in testbeds.items():
2427                if tb in op_params:
2428                    uri, aid = op_params[tb]
2429                    operate=self.operation_segment(log=self.log, testbed=uri,
2430                                cert_file=cert_file, cert_pwd=None,
2431                                trusted_certs=self.trusted_certs,
2432                                caller=self.call_OperationSegment)
2433                    if operate(uri, aid, op, targets, params):
2434                        if operate.status is not None:
2435                            results.extend(operate.status)
2436                            continue
2437                # Something went wrong in a weird way.  Add statuses
2438                # that reflect that to results
2439                for t in targets:
2440                    results.append(operation_status(t, 
2441                        operation_status.federant,
2442                        'Unexpected error on %s' % tb))
2443        # Clean up the tmpdir no matter what
2444        finally:
2445            if tmpdir: self.remove_dirs(tmpdir)
2446
2447    def do_operation(self, req, fid):
2448        """
2449        Find the testbeds holding each target and ask them to carry out the
2450        operation.  Return the statuses.
2451        """
2452        # Map an element to the testbed containing it
2453        def element_to_tb(e):
2454            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2455            elif isinstance(e, topdl.Testbed): return e.name
2456            else: return None
2457        # If d is an operation_status object, make it a dict
2458        def make_dict(d):
2459            if isinstance(d, dict): return d
2460            elif isinstance(d, operation_status): return d.to_dict()
2461            else: return { }
2462
2463        def element_name(e):
2464            if isinstance(e, topdl.Computer): return e.name
2465            elif isinstance(e, topdl.Testbed): 
2466                if e.localname: return e.localname[0]
2467                else: return None
2468            else: return None
2469
2470        req = req.get('OperationRequestBody', None)
2471        if not req:
2472            raise service_error(service_error.req,
2473                    "Bad request format (no OperationRequestBody)")
2474        exp = req.get('experiment', None)
2475        op = req.get('operation', None)
2476        targets = set(req.get('target', []))
2477        params = req.get('parameter', None)
2478
2479        if exp:
2480            if 'fedid' in exp:
2481                key = exp['fedid']
2482                keytype = "fedid"
2483            elif 'localname' in exp:
2484                key = exp['localname']
2485                keytype = "localname"
2486            else:
2487                raise service_error(service_error.req, "Unknown lookup type")
2488        else:
2489            raise service_error(service_error.req, "No request?")
2490
2491        if op is None or not targets:
2492            raise service_error(service_error.req, "No request?")
2493
2494        proof = self.check_experiment_access(fid, key)
2495        self.state_lock.acquire()
2496        if key in self.state:
2497            d1, op_params, cert, d2 = \
2498                    self.get_segment_info(self.state[key], need_lock=False,
2499                            key='tb')
2500            top = self.state[key].top
2501            if top is not None:
2502                top = top.clone()
2503        self.state_lock.release()
2504
2505        if top is None:
2506            raise service_error(service_error.partial, "No topology yet", 
2507                    proof=proof)
2508
2509        testbeds = { }
2510        results = []
2511        for e in top.elements:
2512            ename = element_name(e)
2513            if ename in targets:
2514                tb = element_to_tb(e)
2515                targets.remove(ename)
2516                if tb is not None:
2517                    if tb in testbeds: testbeds[tb].append(ename)
2518                    else: testbeds[tb] = [ ename ]
2519                else:
2520                    results.append(operation_status(e.name, 
2521                        code=operation_status.no_target, 
2522                        description='Cannot map target to testbed'))
2523
2524        for t in targets:
2525            results.append(operation_status(t, operation_status.no_target))
2526
2527        self.operate_on_segments(op_params, cert, op, testbeds, params,
2528                results)
2529
2530        return { 
2531                'experiment': exp, 
2532                'status': [make_dict(r) for r in results],
2533                'proof': proof.to_dict()
2534                }
2535
2536
2537    def get_multi_info(self, req, fid):
2538        """
2539        Return all the stored info that this fedid can access
2540        """
2541        rv = { 'info': [ ], 'proof': [ ] }
2542
2543        self.state_lock.acquire()
2544        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2545            try:
2546                proof = self.check_experiment_access(fid, key)
2547            except service_error, e:
2548                if e.code == service_error.access:
2549                    continue
2550                else:
2551                    self.state_lock.release()
2552                    raise e
2553
2554            if self.state.has_key(key):
2555                e = self.state[key].get_info()
2556                e['proof'] = proof.to_dict()
2557                rv['info'].append(e)
2558                rv['proof'].append(proof.to_dict())
2559        self.state_lock.release()
2560        return rv
2561
2562    def check_termination_status(self, fed_exp, force):
2563        """
2564        Confirm that the experiment is sin a valid state to stop (or force it)
2565        return the state - invalid states for deletion and force settings cause
2566        exceptions.
2567        """
2568        self.state_lock.acquire()
2569        status = fed_exp.status
2570
2571        if status:
2572            if status in ('starting', 'terminating'):
2573                if not force:
2574                    self.state_lock.release()
2575                    raise service_error(service_error.partial, 
2576                            'Experiment still being created or destroyed')
2577                else:
2578                    self.log.warning('Experiment in %s state ' % status + \
2579                            'being terminated by force.')
2580            self.state_lock.release()
2581            return status
2582        else:
2583            # No status??? trouble
2584            self.state_lock.release()
2585            raise service_error(service_error.internal,
2586                    "Experiment has no status!?")
2587
2588    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2589        ids = []
2590        term_params = { }
2591        if need_lock: self.state_lock.acquire()
2592        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2593        expcert = fed_exp.identity
2594        repo = "%s" % fed_exp.fedid
2595
2596        # Collect the allocation/segment ids into a dict keyed by the fedid
2597        # of the allocation that contains a tuple of uri, aid
2598        for i, fed in enumerate(fed_exp.get_all_allocations()):
2599            uri = fed.uri
2600            aid = fed.allocID
2601            if key == 'aid': term_params[aid] = (uri, aid)
2602            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2603
2604        if need_lock: self.state_lock.release()
2605        return ids, term_params, expcert, repo
2606
2607
2608    def get_termination_info(self, fed_exp):
2609        self.state_lock.acquire()
2610        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2611        # Change the experiment state
2612        fed_exp.status = 'terminating'
2613        fed_exp.updated()
2614        if self.state_filename: self.write_state()
2615        self.state_lock.release()
2616
2617        return ids, term_params, expcert, repo
2618
2619
2620    def deallocate_resources(self, term_params, expcert, status, force, 
2621            dealloc_log):
2622        tmpdir = None
2623        # This try block makes sure the tempdir is cleared
2624        try:
2625            # If no expcert, try the deallocation as the experiment
2626            # controller instance.
2627            if expcert and self.auth_type != 'legacy': 
2628                try:
2629                    tmpdir = tempfile.mkdtemp(prefix="term-")
2630                except EnvironmentError:
2631                    raise service_error(service_error.internal, 
2632                            "Cannot create tmp dir")
2633                cert_file = self.make_temp_certfile(expcert, tmpdir)
2634                pw = None
2635            else: 
2636                cert_file = self.cert_file
2637                pw = self.cert_pwd
2638
2639            # Stop everyone.  NB, wait_for_all waits until a thread starts
2640            # and then completes, so we can't wait if nothing starts.  So,
2641            # no tbparams, no start.
2642            if len(term_params) > 0:
2643                tp = thread_pool(self.nthreads)
2644                for k, (uri, aid) in term_params.items():
2645                    # Create and start a thread to stop the segment
2646                    tp.wait_for_slot()
2647                    t  = pooled_thread(\
2648                            target=self.terminate_segment(log=dealloc_log,
2649                                testbed=uri,
2650                                cert_file=cert_file, 
2651                                cert_pwd=pw,
2652                                trusted_certs=self.trusted_certs,
2653                                caller=self.call_TerminateSegment),
2654                            args=(uri, aid), name=k,
2655                            pdata=tp, trace_file=self.trace_file)
2656                    t.start()
2657                # Wait for completions
2658                tp.wait_for_all_done()
2659
2660            # release the allocations (failed experiments have done this
2661            # already, and starting experiments may be in odd states, so we
2662            # ignore errors releasing those allocations
2663            try: 
2664                for k, (uri, aid)  in term_params.items():
2665                    self.release_access(None, aid, uri=uri,
2666                            cert_file=cert_file, cert_pwd=pw)
2667            except service_error, e:
2668                if status != 'failed' and not force:
2669                    raise e
2670
2671        # Clean up the tmpdir no matter what
2672        finally:
2673            if tmpdir: self.remove_dirs(tmpdir)
2674
2675    def terminate_experiment(self, req, fid):
2676        """
2677        Swap this experiment out on the federants and delete the shared
2678        information
2679        """
2680        tbparams = { }
2681        req = req.get('TerminateRequestBody', None)
2682        if not req:
2683            raise service_error(service_error.req,
2684                    "Bad request format (no TerminateRequestBody)")
2685
2686        key = self.get_experiment_key(req, 'experiment')
2687        proof = self.check_experiment_access(fid, key)
2688        exp = req.get('experiment', False)
2689        force = req.get('force', False)
2690
2691        dealloc_list = [ ]
2692
2693
2694        # Create a logger that logs to the dealloc_list as well as to the main
2695        # log file.
2696        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2697        dealloc_log.info("Terminating %s " %key)
2698        h = logging.StreamHandler(self.list_log(dealloc_list))
2699        # XXX: there should be a global one of these rather than repeating the
2700        # code.
2701        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2702                    '%d %b %y %H:%M:%S'))
2703        dealloc_log.addHandler(h)
2704
2705        self.state_lock.acquire()
2706        fed_exp = self.state.get(key, None)
2707        self.state_lock.release()
2708        repo = None
2709
2710        if fed_exp:
2711            status = self.check_termination_status(fed_exp, force)
2712            # get_termination_info updates the experiment state
2713            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2714            self.deallocate_resources(term_params, expcert, status, force, 
2715                    dealloc_log)
2716
2717            # Remove the terminated experiment
2718            self.state_lock.acquire()
2719            for id in ids:
2720                self.clear_experiment_authorization(id, need_state_lock=False)
2721                if id in self.state: del self.state[id]
2722
2723            if self.state_filename: self.write_state()
2724            self.state_lock.release()
2725
2726            # Delete any synch points associated with this experiment.  All
2727            # synch points begin with the fedid of the experiment.
2728            fedid_keys = set(["fedid:%s" % f for f in ids \
2729                    if isinstance(f, fedid)])
2730            for k in self.synch_store.all_keys():
2731                try:
2732                    if len(k) > 45 and k[0:46] in fedid_keys:
2733                        self.synch_store.del_value(k)
2734                except synch_store.BadDeletionError:
2735                    pass
2736            self.write_store()
2737
2738            # Remove software and other cached stuff from the filesystem.
2739            if repo:
2740                self.remove_dirs("%s/%s" % (self.repodir, repo))
2741       
2742            return { 
2743                    'experiment': exp , 
2744                    'deallocationLog': string.join(dealloc_list, ''),
2745                    'proof': [proof.to_dict()],
2746                    }
2747        else:
2748            raise service_error(service_error.req, "No saved state")
2749
2750
2751    def GetValue(self, req, fid):
2752        """
2753        Get a value from the synchronized store
2754        """
2755        req = req.get('GetValueRequestBody', None)
2756        if not req:
2757            raise service_error(service_error.req,
2758                    "Bad request format (no GetValueRequestBody)")
2759       
2760        name = req.get('name', None)
2761        wait = req.get('wait', False)
2762        rv = { 'name': name }
2763
2764        if not name:
2765            raise service_error(service_error.req, "No name?")
2766
2767        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2768
2769        if access_ok:
2770            self.log.debug("[GetValue] asking for %s " % name)
2771            try:
2772                v = self.synch_store.get_value(name, wait)
2773            except synch_store.RevokedKeyError:
2774                # No more synch on this key
2775                raise service_error(service_error.federant, 
2776                        "Synch key %s revoked" % name)
2777            if v is not None:
2778                rv['value'] = v
2779            rv['proof'] = proof.to_dict()
2780            self.log.debug("[GetValue] got %s from %s" % (v, name))
2781            return rv
2782        else:
2783            raise service_error(service_error.access, "Access Denied",
2784                    proof=proof)
2785       
2786
2787    def SetValue(self, req, fid):
2788        """
2789        Set a value in the synchronized store
2790        """
2791        req = req.get('SetValueRequestBody', None)
2792        if not req:
2793            raise service_error(service_error.req,
2794                    "Bad request format (no SetValueRequestBody)")
2795       
2796        name = req.get('name', None)
2797        v = req.get('value', '')
2798
2799        if not name:
2800            raise service_error(service_error.req, "No name?")
2801
2802        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2803
2804        if access_ok:
2805            try:
2806                self.synch_store.set_value(name, v)
2807                self.write_store()
2808                self.log.debug("[SetValue] set %s to %s" % (name, v))
2809            except synch_store.CollisionError:
2810                # Translate into a service_error
2811                raise service_error(service_error.req,
2812                        "Value already set: %s" %name)
2813            except synch_store.RevokedKeyError:
2814                # No more synch on this key
2815                raise service_error(service_error.federant, 
2816                        "Synch key %s revoked" % name)
2817                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2818        else:
2819            raise service_error(service_error.access, "Access Denied",
2820                    proof=proof)
Note: See TracBrowser for help on using the repository browser.