source: fedd/federation/experiment_control.py @ d58ee5e

compt_changes
Last change on this file since d58ee5e was d58ee5e, checked in by Ted Faber <faber@…>, 12 years ago

ALways save creds, even on partial import

  • Property mode set to 100644
File size: 93.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from deter import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info, federated_service
39from operation_status import operation_status
40
41from deter import topdl
42from deter import ip_allocator
43from deter import ip_addr
44import list_log
45
46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
53class experiment_control_local(experiment_control_legacy):
54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
60
61    class ssh_cmd_timeout(RuntimeError): pass
62   
63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
65    call_StartSegment = service_caller('StartSegment')
66    call_TerminateSegment = service_caller('TerminateSegment')
67    call_InfoSegment = service_caller('InfoSegment')
68    call_OperationSegment = service_caller('OperationSegment')
69    call_Ns2Topdl = service_caller('Ns2Topdl')
70
71    def __init__(self, config=None, auth=None):
72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
90        self.list_log = list_log.list_log
91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
102        self.repodir = config.get("experiment_control", "repodir")
103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
105
106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
110        self.nthreads = 10
111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131
132        dt = config.get("experiment_control", "direct_transit")
133        self.auth_type = config.get('experiment_control', 'auth_type') \
134                or 'legacy'
135        self.auth_dir = config.get('experiment_control', 'auth_dir')
136        # XXX: document this!
137        self.info_cache_limit = \
138                config.getint('experiment_control', 'info_cache', 600)
139        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
140        else: self.direct_transit = [ ]
141        # NB for internal master/slave ops, not experiment setup
142        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
143
144        self.overrides = set([])
145        ovr = config.get('experiment_control', 'overrides')
146        if ovr:
147            for o in ovr.split(","):
148                o = o.strip()
149                if o.startswith('fedid:'): o = o[len('fedid:'):]
150                self.overrides.add(fedid(hexstr=o))
151
152        self.state = { }
153        self.state_lock = Lock()
154        self.tclsh = "/usr/local/bin/otclsh"
155        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
156                config.get("experiment_control", "tcl_splitter",
157                        "/usr/testbed/lib/ns2ir/parse.tcl")
158        mapdb_file = config.get("experiment_control", "mapdb")
159        self.trace_file = sys.stderr
160
161        self.def_expstart = \
162                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
163                "/tmp/federate";
164        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
165                "FEDDIR/hosts";
166        self.def_gwstart = \
167                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
168                "/tmp/bridge.log";
169        self.def_mgwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
171                "/tmp/bridge.log";
172        self.def_gwimage = "FBSD61-TUNNEL2";
173        self.def_gwtype = "pc";
174        self.local_access = { }
175
176        if self.auth_type == 'legacy':
177            if auth:
178                self.auth = auth
179            else:
180                self.log.error( "[access]: No authorizer initialized, " +\
181                        "creating local one.")
182                auth = authorizer()
183            self.get_access = self.legacy_get_access
184        elif self.auth_type == 'abac':
185            self.auth = abac_authorizer(load=self.auth_dir)
186        else:
187            raise service_error(service_error.internal, 
188                    "Unknown auth_type: %s" % self.auth_type)
189
190        if mapdb_file:
191            self.read_mapdb(mapdb_file)
192        else:
193            self.log.warn("[experiment_control] No testbed map, using defaults")
194            self.tbmap = { 
195                    'deter':'https://users.isi.deterlab.net:23235',
196                    'emulab':'https://users.isi.deterlab.net:23236',
197                    'ucb':'https://users.isi.deterlab.net:23237',
198                    }
199
200        # Grab saved state.  OK to do this w/o locking because it's read only
201        # and only one thread should be in existence that can see self.state at
202        # this point.
203        if self.state_filename:
204            self.read_state()
205
206        if self.store_filename:
207            self.read_store()
208        else:
209            self.log.warning("No saved synch store")
210            self.synch_store = synch_store
211
212        # Dispatch tables
213        self.soap_services = {\
214                'New': soap_handler('New', self.new_experiment),
215                'Create': soap_handler('Create', self.create_experiment),
216                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
217                'Vis': soap_handler('Vis', self.get_vis),
218                'Info': soap_handler('Info', self.get_info),
219                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
220                'Operation': soap_handler('Operation', self.do_operation),
221                'Terminate': soap_handler('Terminate', 
222                    self.terminate_experiment),
223                'GetValue': soap_handler('GetValue', self.GetValue),
224                'SetValue': soap_handler('SetValue', self.SetValue),
225        }
226
227        self.xmlrpc_services = {\
228                'New': xmlrpc_handler('New', self.new_experiment),
229                'Create': xmlrpc_handler('Create', self.create_experiment),
230                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
231                'Vis': xmlrpc_handler('Vis', self.get_vis),
232                'Info': xmlrpc_handler('Info', self.get_info),
233                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
234                'Terminate': xmlrpc_handler('Terminate',
235                    self.terminate_experiment),
236                'Operation': xmlrpc_handler('Operation', self.do_operation),
237                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
238                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
239        }
240
241    # Call while holding self.state_lock
242    def write_state(self):
243        """
244        Write a new copy of experiment state after copying the existing state
245        to a backup.
246
247        State format is a simple pickling of the state dictionary.
248        """
249        if os.access(self.state_filename, os.W_OK):
250            copy_file(self.state_filename, \
251                    "%s.bak" % self.state_filename)
252        try:
253            f = open(self.state_filename, 'w')
254            pickle.dump(self.state, f)
255        except EnvironmentError, e:
256            self.log.error("Can't write file %s: %s" % \
257                    (self.state_filename, e))
258        except pickle.PicklingError, e:
259            self.log.error("Pickling problem: %s" % e)
260        except TypeError, e:
261            self.log.error("Pickling problem (TypeError): %s" % e)
262
263    @staticmethod
264    def get_alloc_ids(exp):
265        """
266        Used by read_store and read state.  This used to be worse.
267        """
268
269        return [ a.allocID for a in exp.get_all_allocations() ]
270
271
272    # Call while holding self.state_lock
273    def read_state(self):
274        """
275        Read a new copy of experiment state.  Old state is overwritten.
276
277        State format is a simple pickling of the state dictionary.
278        """
279       
280        try:
281            f = open(self.state_filename, "r")
282            self.state = pickle.load(f)
283            self.log.debug("[read_state]: Read state from %s" % \
284                    self.state_filename)
285        except EnvironmentError, e:
286            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
287                    % (self.state_filename, e))
288        except pickle.UnpicklingError, e:
289            self.log.warning(("[read_state]: No saved state: " + \
290                    "Unpickling failed: %s") % e)
291       
292        for s in self.state.values():
293            try:
294
295                eid = s.fedid
296                if eid : 
297                    if self.auth_type == 'legacy':
298                        # XXX: legacy
299                        # Give the owner rights to the experiment
300                        #self.auth.set_attribute(s['owner'], eid)
301                        # And holders of the eid as well
302                        self.auth.set_attribute(eid, eid)
303                        # allow overrides to control experiments as well
304                        for o in self.overrides:
305                            self.auth.set_attribute(o, eid)
306                        # Set permissions to allow reading of the software
307                        # repo, if any, as well.
308                        for a in self.get_alloc_ids(s):
309                            self.auth.set_attribute(a, 'repo/%s' % eid)
310                else:
311                    raise KeyError("No experiment id")
312            except KeyError, e:
313                self.log.warning("[read_state]: State ownership or identity " +\
314                        "misformatted in %s: %s" % (self.state_filename, e))
315
316    def read_mapdb(self, file):
317        """
318        Read a simple colon separated list of mappings for the
319        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
320        also adds testbeds to active if they include , active after
321        their name.
322        """
323
324        self.tbmap = { }
325        self.tbactive = set()
326        lineno =0
327        try:
328            f = open(file, "r")
329            for line in f:
330                lineno += 1
331                line = line.strip()
332                if line.startswith('#') or len(line) == 0:
333                    continue
334                try:
335                    label, url = line.split(':', 1)
336                    if ',' in label:
337                        label, act = label.split(',', 1)
338                        active = (act.strip() == 'active')
339                    else:
340                        active = False
341                    self.tbmap[label] = url
342                    if active: self.tbactive.add(label)
343                except ValueError, e:
344                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
345                            "map db: %s %s" % (lineno, line, e))
346        except EnvironmentError, e:
347            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
348                    "open %s: %s" % (file, e))
349        else:
350            f.close()
351
352    def read_store(self):
353        try:
354            self.synch_store = synch_store()
355            self.synch_store.load(self.store_filename)
356            self.log.debug("[read_store]: Read store from %s" % \
357                    self.store_filename)
358        except EnvironmentError, e:
359            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
360                    % (self.state_filename, e))
361            self.synch_store = synch_store()
362
363        # Set the initial permissions on data in the store.  XXX: This ad hoc
364        # authorization attribute initialization is getting out of hand.
365        # XXX: legacy
366        if self.auth_type == 'legacy':
367            for k in self.synch_store.all_keys():
368                try:
369                    if k.startswith('fedid:'):
370                        fid = fedid(hexstr=k[6:46])
371                        if self.state.has_key(fid):
372                            for a in self.get_alloc_ids(self.state[fid]):
373                                self.auth.set_attribute(a, k)
374                except ValueError, e:
375                    self.log.warn("Cannot deduce permissions for %s" % k)
376
377
378    def write_store(self):
379        """
380        Write a new copy of synch_store after writing current state
381        to a backup.  We use the internal synch_store pickle method to avoid
382        incinsistent data.
383
384        State format is a simple pickling of the store.
385        """
386        if os.access(self.store_filename, os.W_OK):
387            copy_file(self.store_filename, \
388                    "%s.bak" % self.store_filename)
389        try:
390            self.synch_store.save(self.store_filename)
391        except EnvironmentError, e:
392            self.log.error("Can't write file %s: %s" % \
393                    (self.store_filename, e))
394        except TypeError, e:
395            self.log.error("Pickling problem (TypeError): %s" % e)
396
397
398    def remove_dirs(self, dir):
399        """
400        Remove the directory tree and all files rooted at dir.  Log any errors,
401        but continue.
402        """
403        self.log.debug("[removedirs]: removing %s" % dir)
404        try:
405            for path, dirs, files in os.walk(dir, topdown=False):
406                for f in files:
407                    os.remove(os.path.join(path, f))
408                for d in dirs:
409                    os.rmdir(os.path.join(path, d))
410            os.rmdir(dir)
411        except EnvironmentError, e:
412            self.log.error("Error deleting directory tree in %s" % e);
413
414    @staticmethod
415    def make_temp_certfile(expcert, tmpdir):
416        """
417        make a protected copy of the access certificate so the experiment
418        controller can act as the experiment principal.  mkstemp is the most
419        secure way to do that. The directory should be created by
420        mkdtemp.  Return the filename.
421        """
422        if expcert and tmpdir:
423            try:
424                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
425                f = os.fdopen(certf, 'w')
426                print >> f, expcert
427                f.close()
428            except EnvironmentError, e:
429                raise service_error(service_error.internal, 
430                        "Cannot create temp cert file?")
431            return certfn
432        else:
433            return None
434
435       
436    def generate_ssh_keys(self, dest, type="rsa" ):
437        """
438        Generate a set of keys for the gateways to use to talk.
439
440        Keys are of type type and are stored in the required dest file.
441        """
442        valid_types = ("rsa", "dsa")
443        t = type.lower();
444        if t not in valid_types: raise ValueError
445        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
446
447        try:
448            trace = open("/dev/null", "w")
449        except EnvironmentError:
450            raise service_error(service_error.internal,
451                    "Cannot open /dev/null??");
452
453        # May raise CalledProcessError
454        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
455        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
456        if rv != 0:
457            raise service_error(service_error.internal, 
458                    "Cannot generate nonce ssh keys.  %s return code %d" \
459                            % (self.ssh_keygen, rv))
460
461    def generate_seer_certs(self, destdir):
462        '''
463        Create a SEER ca cert and a node cert in destdir/ca.pem and
464        destdir/node.pem respectively.  These will be distributed throughout
465        the federated experiment.  This routine reports errors via
466        service_errors.
467        '''
468        openssl = '/usr/bin/openssl'
469        # All the filenames and parameters we need for openssl calls below
470        ca_key =os.path.join(destdir, 'ca.key') 
471        ca_pem = os.path.join(destdir, 'ca.pem')
472        node_key =os.path.join(destdir, 'node.key') 
473        node_pem = os.path.join(destdir, 'node.pem')
474        node_req = os.path.join(destdir, 'node.req')
475        node_signed = os.path.join(destdir, 'node.signed')
476        days = '%s' % (365 * 10)
477        serial = '%s' % random.randint(0, 1<<16)
478
479        try:
480            # Sequence of calls to create a CA key, create a ca cert, create a
481            # node key, node signing request, and finally a signed node
482            # certificate.
483            sequence = (
484                    (openssl, 'genrsa', '-out', ca_key, '1024'),
485                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
486                        ca_pem, '-days', days, '-subj', 
487                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
488                    (openssl, 'genrsa', '-out', node_key, '1024'),
489                    (openssl, 'req', '-new', '-key', node_key, '-out', 
490                        node_req, '-days', days, '-subj', 
491                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
492                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
493                        '-set_serial', serial, '-req', '-in', node_req, 
494                        '-out', node_signed, '-days', days),
495                )
496            # Do all that stuff; bail if there's an error, and push all the
497            # output to dev/null.
498            for cmd in sequence:
499                trace = open("/dev/null", "w")
500                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
501                if rv != 0:
502                    raise service_error(service_error.internal, 
503                            "Cannot generate SEER certs.  %s return code %d" \
504                                    % (' '.join(cmd), rv))
505            # Concatinate the node key and signed certificate into node.pem
506            f = open(node_pem, 'w')
507            for comp in (node_signed, node_key):
508                g = open(comp, 'r')
509                f.write(g.read())
510                g.close()
511            f.close()
512
513            # Throw out intermediaries.
514            for fn in (ca_key, node_key, node_req, node_signed):
515                os.unlink(fn)
516
517        except EnvironmentError, e:
518            # Any difficulties with the file system wind up here
519            raise service_error(service_error.internal,
520                    "File error on  %s while creating SEER certs: %s" % \
521                            (e.filename, e.strerror))
522
523
524
525    def gentopo(self, str):
526        """
527        Generate the topology data structure from the splitter's XML
528        representation of it.
529
530        The topology XML looks like:
531            <experiment>
532                <nodes>
533                    <node><vname></vname><ips>ip1:ip2</ips></node>
534                </nodes>
535                <lans>
536                    <lan>
537                        <vname></vname><vnode></vnode><ip></ip>
538                        <bandwidth></bandwidth><member>node:port</member>
539                    </lan>
540                </lans>
541        """
542        class topo_parse:
543            """
544            Parse the topology XML and create the dats structure.
545            """
546            def __init__(self):
547                # Typing of the subelements for data conversion
548                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
549                self.int_subelements = ( 'bandwidth',)
550                self.float_subelements = ( 'delay',)
551                # The final data structure
552                self.nodes = [ ]
553                self.lans =  [ ]
554                self.topo = { \
555                        'node': self.nodes,\
556                        'lan' : self.lans,\
557                    }
558                self.element = { }  # Current element being created
559                self.chars = ""     # Last text seen
560
561            def end_element(self, name):
562                # After each sub element the contents is added to the current
563                # element or to the appropriate list.
564                if name == 'node':
565                    self.nodes.append(self.element)
566                    self.element = { }
567                elif name == 'lan':
568                    self.lans.append(self.element)
569                    self.element = { }
570                elif name in self.str_subelements:
571                    self.element[name] = self.chars
572                    self.chars = ""
573                elif name in self.int_subelements:
574                    self.element[name] = int(self.chars)
575                    self.chars = ""
576                elif name in self.float_subelements:
577                    self.element[name] = float(self.chars)
578                    self.chars = ""
579
580            def found_chars(self, data):
581                self.chars += data.rstrip()
582
583
584        tp = topo_parse();
585        parser = xml.parsers.expat.ParserCreate()
586        parser.EndElementHandler = tp.end_element
587        parser.CharacterDataHandler = tp.found_chars
588
589        parser.Parse(str)
590
591        return tp.topo
592       
593
594    def genviz(self, topo):
595        """
596        Generate the visualization the virtual topology
597        """
598
599        neato = "/usr/local/bin/neato"
600        # These are used to parse neato output and to create the visualization
601        # file.
602        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
603        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
604                "%s</type></node>"
605
606        try:
607            # Node names
608            nodes = [ n['vname'] for n in topo['node'] ]
609            topo_lans = topo['lan']
610        except KeyError, e:
611            raise service_error(service_error.internal, "Bad topology: %s" %e)
612
613        lans = { }
614        links = { }
615
616        # Walk through the virtual topology, organizing the connections into
617        # 2-node connections (links) and more-than-2-node connections (lans).
618        # When a lan is created, it's added to the list of nodes (there's a
619        # node in the visualization for the lan).
620        for l in topo_lans:
621            if links.has_key(l['vname']):
622                if len(links[l['vname']]) < 2:
623                    links[l['vname']].append(l['vnode'])
624                else:
625                    nodes.append(l['vname'])
626                    lans[l['vname']] = links[l['vname']]
627                    del links[l['vname']]
628                    lans[l['vname']].append(l['vnode'])
629            elif lans.has_key(l['vname']):
630                lans[l['vname']].append(l['vnode'])
631            else:
632                links[l['vname']] = [ l['vnode'] ]
633
634
635        # Open up a temporary file for dot to turn into a visualization
636        try:
637            df, dotname = tempfile.mkstemp()
638            dotfile = os.fdopen(df, 'w')
639        except EnvironmentError:
640            raise service_error(service_error.internal,
641                    "Failed to open file in genviz")
642
643        try:
644            dnull = open('/dev/null', 'w')
645        except EnvironmentError:
646            service_error(service_error.internal,
647                    "Failed to open /dev/null in genviz")
648
649        # Generate a dot/neato input file from the links, nodes and lans
650        try:
651            print >>dotfile, "graph G {"
652            for n in nodes:
653                print >>dotfile, '\t"%s"' % n
654            for l in links.keys():
655                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
656            for l in lans.keys():
657                for n in lans[l]:
658                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
659            print >>dotfile, "}"
660            dotfile.close()
661        except TypeError:
662            raise service_error(service_error.internal,
663                    "Single endpoint link in vtopo")
664        except EnvironmentError:
665            raise service_error(service_error.internal, "Cannot write dot file")
666
667        # Use dot to create a visualization
668        try:
669            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
670                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
671                stderr=dnull, close_fds=True)
672        except EnvironmentError:
673            raise service_error(service_error.internal, 
674                    "Cannot generate visualization: is graphviz available?")
675        dnull.close()
676
677        # Translate dot to vis format
678        vis_nodes = [ ]
679        vis = { 'node': vis_nodes }
680        for line in dot.stdout:
681            m = vis_re.match(line)
682            if m:
683                vn = m.group(1)
684                vis_node = {'name': vn, \
685                        'x': float(m.group(2)),\
686                        'y' : float(m.group(3)),\
687                    }
688                if vn in links.keys() or vn in lans.keys():
689                    vis_node['type'] = 'lan'
690                else:
691                    vis_node['type'] = 'node'
692                vis_nodes.append(vis_node)
693        rv = dot.wait()
694
695        os.remove(dotname)
696        # XXX: graphviz seems to use low return codes for warnings, like
697        # "couldn't find font"
698        if rv < 2 : return vis
699        else: return None
700
701
702    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
703            cert_pwd=None):
704        """
705        Release access to testbed through fedd
706        """
707
708        if not uri and tbmap:
709            uri = tbmap.get(tb, None)
710        if not uri:
711            raise service_error(service_error.server_config, 
712                    "Unknown testbed: %s" % tb)
713
714        if self.local_access.has_key(uri):
715            resp = self.local_access[uri].ReleaseAccess(\
716                    { 'ReleaseAccessRequestBody' : 
717                        {'allocID': {'fedid': aid}},}, 
718                    fedid(file=cert_file))
719            resp = { 'ReleaseAccessResponseBody': resp } 
720        else:
721            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
722                    cert_file, cert_pwd, self.trusted_certs)
723
724        # better error coding
725
726    def remote_ns2topdl(self, uri, desc):
727
728        req = {
729                'description' : { 'ns2description': desc },
730            }
731
732        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
733                self.trusted_certs)
734
735        if r.has_key('Ns2TopdlResponseBody'):
736            r = r['Ns2TopdlResponseBody']
737            ed = r.get('experimentdescription', None)
738            if ed.has_key('topdldescription'):
739                return topdl.Topology(**ed['topdldescription'])
740            else:
741                raise service_error(service_error.protocol, 
742                        "Bad splitter response (no output)")
743        else:
744            raise service_error(service_error.protocol, "Bad splitter response")
745
746    class start_segment:
747        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
748                cert_pwd=None, trusted_certs=None, caller=None,
749                log_collector=None):
750            self.log = log
751            self.debug = debug
752            self.cert_file = cert_file
753            self.cert_pwd = cert_pwd
754            self.trusted_certs = None
755            self.caller = caller
756            self.testbed = testbed
757            self.log_collector = log_collector
758            self.response = None
759            self.node = { }
760            self.subs = { }
761            self.tb = { }
762            self.proof = None
763
764        def make_map(self, resp):
765            if 'segmentdescription' not in resp  or \
766                    'topdldescription' not in resp['segmentdescription']:
767                self.log.warn('No topology returned from startsegment')
768                return 
769
770            top = topdl.Topology(
771                    **resp['segmentdescription']['topdldescription'])
772
773            for e in top.elements:
774                if isinstance(e, topdl.Computer):
775                    self.node[e.name] = e
776                elif isinstance(e, topdl.Testbed):
777                    self.tb[e.uri] = e
778            for s in top.substrates:
779                self.subs[s.name] = s
780
781        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
782            req = {
783                    'allocID': { 'fedid' : aid }, 
784                    'segmentdescription': { 
785                        'topdldescription': topo.to_dict(),
786                    },
787                }
788
789            if connInfo:
790                req['connection'] = connInfo
791
792            import_svcs = [ s for m in masters.values() \
793                    for s in m if self.testbed in s.importers]
794
795            if import_svcs or self.testbed in masters:
796                req['service'] = []
797
798            for s in import_svcs:
799                for r in s.reqs:
800                    sr = copy.deepcopy(r)
801                    sr['visibility'] = 'import';
802                    req['service'].append(sr)
803
804            for s in masters.get(self.testbed, []):
805                for r in s.reqs:
806                    sr = copy.deepcopy(r)
807                    sr['visibility'] = 'export';
808                    req['service'].append(sr)
809
810            if attrs:
811                req['fedAttr'] = attrs
812
813            try:
814                self.log.debug("Calling StartSegment at %s " % uri)
815                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
816                        self.trusted_certs)
817                if r.has_key('StartSegmentResponseBody'):
818                    lval = r['StartSegmentResponseBody'].get('allocationLog',
819                            None)
820                    if lval and self.log_collector:
821                        for line in  lval.splitlines(True):
822                            self.log_collector.write(line)
823                    self.make_map(r['StartSegmentResponseBody'])
824                    if 'proof' in r: self.proof = r['proof']
825                    self.response = r
826                else:
827                    raise service_error(service_error.internal, 
828                            "Bad response!?: %s" %r)
829                return True
830            except service_error, e:
831                self.log.error("Start segment failed on %s: %s" % \
832                        (self.testbed, e))
833                return False
834
835
836
837    class terminate_segment:
838        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
839                cert_pwd=None, trusted_certs=None, caller=None):
840            self.log = log
841            self.debug = debug
842            self.cert_file = cert_file
843            self.cert_pwd = cert_pwd
844            self.trusted_certs = None
845            self.caller = caller
846            self.testbed = testbed
847
848        def __call__(self, uri, aid ):
849            req = {
850                    'allocID': {'fedid': aid }, 
851                }
852            self.log.info("Calling terminate segment")
853            try:
854                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
855                        self.trusted_certs)
856                self.log.info("Terminate segment succeeded")
857                return True
858            except service_error, e:
859                self.log.error("Terminate segment failed on %s: %s" % \
860                        (self.testbed, e))
861                return False
862
863    class info_segment(start_segment):
864        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
865                cert_pwd=None, trusted_certs=None, caller=None,
866                log_collector=None):
867            experiment_control_local.start_segment.__init__(self, debug, 
868                    log, testbed, cert_file, cert_pwd, trusted_certs, 
869                    caller, log_collector)
870
871        def __call__(self, uri, aid):
872            req = { 'allocID': { 'fedid' : aid } }
873
874            try:
875                self.log.debug("Calling InfoSegment at %s " % uri)
876                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
877                        self.trusted_certs)
878                if r.has_key('InfoSegmentResponseBody'):
879                    self.make_map(r['InfoSegmentResponseBody'])
880                    if 'proof' in r: self.proof = r['proof']
881                    self.response = r
882                else:
883                    raise service_error(service_error.internal, 
884                            "Bad response!?: %s" %r)
885                return True
886            except service_error, e:
887                self.log.error("Info segment failed on %s: %s" % \
888                        (self.testbed, e))
889                return False
890
891    class operation_segment:
892        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
893                cert_pwd=None, trusted_certs=None, caller=None,
894                log_collector=None):
895            self.log = log
896            self.debug = debug
897            self.cert_file = cert_file
898            self.cert_pwd = cert_pwd
899            self.trusted_certs = None
900            self.caller = caller
901            self.testbed = testbed
902            self.status = None
903
904        def __call__(self, uri, aid, op, targets, params):
905            req = { 
906                    'allocID': { 'fedid' : aid },
907                    'operation': op,
908                    'target': targets,
909                    }
910            if params: req['parameter'] = params
911
912
913            try:
914                self.log.debug("Calling OperationSegment at %s " % uri)
915                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
916                        self.trusted_certs)
917                if 'OperationSegmentResponseBody' in r:
918                    r = r['OperationSegmentResponseBody']
919                    if 'status' in r:
920                        self.status = r['status']
921                else:
922                    raise service_error(service_error.internal, 
923                            "Bad response!?: %s" %r)
924                return True
925            except service_error, e:
926                self.log.error("Operation segment failed on %s: %s" % \
927                        (self.testbed, e))
928                return False
929
930    def annotate_topology(self, top, data):
931        # These routines do various parts of the annotation
932        def add_new_names(nl, l):
933            """ add any names in nl to the list in l """
934            for n in nl:
935                if n not in l: l.append(n)
936       
937        def merge_services(ne, e):
938            for ns in ne.service:
939                # NB: the else is on the for
940                for s in e.service:
941                    if ns.name == s.name:
942                        s.importer = ns.importer
943                        s.param = ns.param
944                        s.description = ns.description
945                        s.status = ns.status
946                        break
947                else:
948                    e.service.append(ns)
949       
950        def merge_oses(ne, e):
951            """
952            Merge the operating system entries of ne into e
953            """
954            for nos in ne.os:
955                # NB: the else is on the for
956                for os in e.os:
957                    if nos.name == os.name:
958                        os.version = nos.version
959                        os.version = nos.distribution
960                        os.version = nos.distributionversion
961                        for a in nos.attribute:
962                            if os.get_attribute(a.attribute):
963                                os.remove_attribute(a.attribute)
964                            os.set_attribute(a.attribute, a.value)
965                        break
966                else:
967                    # If both nodes have one OS, this is a replacement
968                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
969                    else: e.os.append(nos)
970
971        # Annotate the topology with embedding info
972        for e in top.elements:
973            if isinstance(e, topdl.Computer):
974                for s in data:
975                    ne = s.node.get(e.name, None)
976                    if ne is not None:
977                        add_new_names(ne.localname, e.localname)
978                        e.status = ne.status
979                        merge_services(ne, e)
980                        add_new_names(ne.operation, e.operation)
981                        if ne.os: merge_oses(ne, e)
982                        break
983            elif isinstance(e,topdl.Testbed):
984                for s in data:
985                    ne = s.tb.get(e.uri, None)
986                    if ne is not None:
987                        add_new_names(ne.localname, e.localname)
988                        add_new_names(ne.operation, e.operation)
989                        merge_services(ne, e)
990                        for a in ne.attribute:
991                            e.set_attribute(a.attribute, a.value)
992        # Annotate substrates
993        for s in top.substrates:
994            for d in data:
995                ss = d.subs.get(s.name, None)
996                if ss is not None:
997                    if ss.capacity is not None:
998                        s.capacity = ss.capacity
999                    if s.latency is not None:
1000                        s.latency = ss.latency
1001
1002
1003
1004    def allocate_resources(self, allocated, masters, eid, expid, 
1005            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1006            attrs=None, connInfo={}, tbmap=None, expcert=None):
1007
1008        started = { }           # Testbeds where a sub-experiment started
1009                                # successfully
1010
1011        # XXX
1012        fail_soft = False
1013
1014        if tbmap is None: tbmap = { }
1015
1016        log = alloc_log or self.log
1017
1018        tp = thread_pool(self.nthreads)
1019        threads = [ ]
1020        starters = [ ]
1021
1022        if expcert:
1023            cert = expcert
1024            pw = None
1025        else:
1026            cert = self.cert_file
1027            pw = self.cert_pwd
1028
1029        for tb in allocated.keys():
1030            # Create and start a thread to start the segment, and save it
1031            # to get the return value later
1032            tb_attrs = copy.copy(attrs)
1033            tp.wait_for_slot()
1034            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1035            base, suffix = split_testbed(tb)
1036            if suffix:
1037                tb_attrs.append({'attribute': 'experiment_name', 
1038                    'value': "%s-%s" % (eid, suffix)})
1039            else:
1040                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1041            if not uri:
1042                raise service_error(service_error.internal, 
1043                        "Unknown testbed %s !?" % tb)
1044
1045            aid = tbparams[tb].allocID
1046            if not aid:
1047                raise service_error(service_error.internal, 
1048                        "No alloc id for testbed %s !?" % tb)
1049
1050            s = self.start_segment(log=log, debug=self.debug,
1051                    testbed=tb, cert_file=cert,
1052                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1053                    caller=self.call_StartSegment,
1054                    log_collector=log_collector)
1055            starters.append(s)
1056            t  = pooled_thread(\
1057                    target=s, name=tb,
1058                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1059                    pdata=tp, trace_file=self.trace_file)
1060            threads.append(t)
1061            t.start()
1062
1063        # Wait until all finish (keep pinging the log, though)
1064        mins = 0
1065        revoked = False
1066        while not tp.wait_for_all_done(60.0):
1067            mins += 1
1068            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1069                    % mins)
1070            if not revoked and \
1071                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1072                # a testbed has failed.  Revoke this experiment's
1073                # synchronizarion values so that sub experiments will not
1074                # deadlock waiting for synchronization that will never happen
1075                self.log.info("A subexperiment has failed to swap in, " + \
1076                        "revoking synch keys")
1077                var_key = "fedid:%s" % expid
1078                for k in self.synch_store.all_keys():
1079                    if len(k) > 45 and k[0:46] == var_key:
1080                        self.synch_store.revoke_key(k)
1081                revoked = True
1082
1083        failed = [ t.getName() for t in threads if not t.rv ]
1084        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1085
1086        # If one failed clean up, unless fail_soft is set
1087        if failed:
1088            if not fail_soft:
1089                tp.clear()
1090                for tb in succeeded:
1091                    # Create and start a thread to stop the segment
1092                    tp.wait_for_slot()
1093                    uri = tbparams[tb].uri
1094                    t  = pooled_thread(\
1095                            target=self.terminate_segment(log=log,
1096                                testbed=tb,
1097                                cert_file=cert, 
1098                                cert_pwd=pw,
1099                                trusted_certs=self.trusted_certs,
1100                                caller=self.call_TerminateSegment),
1101                            args=(uri, tbparams[tb].allocID),
1102                            name=tb,
1103                            pdata=tp, trace_file=self.trace_file)
1104                    t.start()
1105                # Wait until all finish (if any are being stopped)
1106                if succeeded:
1107                    tp.wait_for_all_done()
1108
1109                # release the allocations
1110                for tb in tbparams.keys():
1111                    try:
1112                        self.release_access(tb, tbparams[tb].allocID, 
1113                                tbmap=tbmap, uri=tbparams[tb].uri,
1114                                cert_file=cert, cert_pwd=pw)
1115                    except service_error, e:
1116                        self.log.warn("Error releasing access: %s" % e.desc)
1117                # Remove the placeholder
1118                self.state_lock.acquire()
1119                self.state[eid].status = 'failed'
1120                self.state[eid].updated()
1121                if self.state_filename: self.write_state()
1122                self.state_lock.release()
1123                # Remove the repo dir
1124                self.remove_dirs("%s/%s" %(self.repodir, expid))
1125                # Walk up tmpdir, deleting as we go
1126                if self.cleanup:
1127                    self.remove_dirs(tmpdir)
1128                else:
1129                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1130
1131
1132                log.error("Swap in failed on %s" % ",".join(failed))
1133                return
1134        else:
1135            # Walk through the successes and gather the proofs
1136            proofs = { }
1137            for s in starters:
1138                if s.proof:
1139                    proofs[s.testbed] = s.proof
1140            self.annotate_topology(top, starters)
1141            log.info("[start_segment]: Experiment %s active" % eid)
1142
1143
1144        # Walk up tmpdir, deleting as we go
1145        if self.cleanup:
1146            self.remove_dirs(tmpdir)
1147        else:
1148            log.debug("[start_experiment]: not removing %s" % tmpdir)
1149
1150        # Insert the experiment into our state and update the disk copy.
1151        self.state_lock.acquire()
1152        self.state[expid].status = 'active'
1153        self.state[eid] = self.state[expid]
1154        self.state[eid].top = top
1155        self.state[eid].updated()
1156        # Append startup proofs
1157        for f in self.state[eid].get_all_allocations():
1158            if f.tb in proofs:
1159                f.proof.append(proofs[f.tb])
1160
1161        if self.state_filename: self.write_state()
1162        self.state_lock.release()
1163        return
1164
1165
1166    def add_kit(self, e, kit):
1167        """
1168        Add a Software object created from the list of (install, location)
1169        tuples passed as kit  to the software attribute of an object e.  We
1170        do this enough to break out the code, but it's kind of a hack to
1171        avoid changing the old tuple rep.
1172        """
1173
1174        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1175
1176        if isinstance(e.software, list): e.software.extend(s)
1177        else: e.software = s
1178
1179    def append_experiment_authorization(self, expid, attrs, 
1180            need_state_lock=True):
1181        """
1182        Append the authorization information to system state
1183        """
1184
1185        for p, a in attrs:
1186            self.auth.set_attribute(p, a)
1187        self.auth.save()
1188
1189        if need_state_lock: self.state_lock.acquire()
1190        # XXX: really a no op?
1191        #self.state[expid]['auth'].update(attrs)
1192        if self.state_filename: self.write_state()
1193        if need_state_lock: self.state_lock.release()
1194
1195    def clear_experiment_authorization(self, expid, need_state_lock=True):
1196        """
1197        Attrs is a set of attribute principal pairs that need to be removed
1198        from the authenticator.  Remove them and save the authenticator.
1199        """
1200
1201        if need_state_lock: self.state_lock.acquire()
1202        # XXX: should be a no-op
1203        #if expid in self.state and 'auth' in self.state[expid]:
1204            #for p, a in self.state[expid]['auth']:
1205                #self.auth.unset_attribute(p, a)
1206            #self.state[expid]['auth'] = set()
1207        if self.state_filename: self.write_state()
1208        if need_state_lock: self.state_lock.release()
1209        self.auth.save()
1210
1211
1212    def create_experiment_state(self, fid, req, expid, expcert,
1213            state='starting'):
1214        """
1215        Create the initial entry in the experiment's state.  The expid and
1216        expcert are the experiment's fedid and certifacte that represents that
1217        ID, which are installed in the experiment state.  If the request
1218        includes a suggested local name that is used if possible.  If the local
1219        name is already taken by an experiment owned by this user that has
1220        failed, it is overwritten.  Otherwise new letters are added until a
1221        valid localname is found.  The generated local name is returned.
1222        """
1223
1224        if req.has_key('experimentID') and \
1225                req['experimentID'].has_key('localname'):
1226            overwrite = False
1227            eid = req['experimentID']['localname']
1228            # If there's an old failed experiment here with the same local name
1229            # and accessible by this user, we'll overwrite it, otherwise we'll
1230            # fall through and do the collision avoidance.
1231            old_expid = self.get_experiment_fedid(eid)
1232            if old_expid:
1233                users_experiment = True
1234                try:
1235                    self.check_experiment_access(fid, old_expid)
1236                except service_error, e:
1237                    if e.code == service_error.access: users_experiment = False
1238                    else: raise e
1239                if users_experiment:
1240                    self.state_lock.acquire()
1241                    status = self.state[eid].status
1242                    if status and status == 'failed':
1243                        # remove the old access attributes
1244                        self.clear_experiment_authorization(eid,
1245                                need_state_lock=False)
1246                        overwrite = True
1247                        del self.state[eid]
1248                        del self.state[old_expid]
1249                    self.state_lock.release()
1250                else:
1251                    self.log.info('Experiment %s exists, ' % eid + \
1252                            'but this user cannot access it')
1253            self.state_lock.acquire()
1254            while (self.state.has_key(eid) and not overwrite):
1255                eid += random.choice(string.ascii_letters)
1256            # Initial state
1257            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1258                    identity=expcert)
1259            self.state[expid] = self.state[eid]
1260            if self.state_filename: self.write_state()
1261            self.state_lock.release()
1262        else:
1263            eid = self.exp_stem
1264            for i in range(0,5):
1265                eid += random.choice(string.ascii_letters)
1266            self.state_lock.acquire()
1267            while (self.state.has_key(eid)):
1268                eid = self.exp_stem
1269                for i in range(0,5):
1270                    eid += random.choice(string.ascii_letters)
1271            # Initial state
1272            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1273                    identity=expcert)
1274            self.state[expid] = self.state[eid]
1275            if self.state_filename: self.write_state()
1276            self.state_lock.release()
1277
1278        # Let users touch the state.  Authorize this fid and the expid itself
1279        # to touch the experiment, as well as allowing th eoverrides.
1280        self.append_experiment_authorization(eid, 
1281                set([(fid, expid), (expid,expid)] + \
1282                        [ (o, expid) for o in self.overrides]))
1283
1284        return eid
1285
1286
1287    def allocate_ips_to_topo(self, top):
1288        """
1289        Add an ip4_address attribute to all the hosts in the topology, based on
1290        the shared substrates on which they sit.  An /etc/hosts file is also
1291        created and returned as a list of hostfiles entries.  We also return
1292        the allocator, because we may need to allocate IPs to portals
1293        (specifically DRAGON portals).
1294        """
1295        subs = sorted(top.substrates, 
1296                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1297                reverse=True)
1298        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1299        ifs = { }
1300        hosts = [ ]
1301
1302        for idx, s in enumerate(subs):
1303            net_size = len(s.interfaces)+2
1304
1305            a = ips.allocate(net_size)
1306            if a :
1307                base, num = a
1308                if num < net_size: 
1309                    raise service_error(service_error.internal,
1310                            "Allocator returned wrong number of IPs??")
1311            else:
1312                raise service_error(service_error.req, 
1313                        "Cannot allocate IP addresses")
1314            mask = ips.min_alloc
1315            while mask < net_size:
1316                mask *= 2
1317
1318            netmask = ((2**32-1) ^ (mask-1))
1319
1320            base += 1
1321            for i in s.interfaces:
1322                i.attribute.append(
1323                        topdl.Attribute('ip4_address', 
1324                            "%s" % ip_addr(base)))
1325                i.attribute.append(
1326                        topdl.Attribute('ip4_netmask', 
1327                            "%s" % ip_addr(int(netmask))))
1328
1329                hname = i.element.name
1330                if ifs.has_key(hname):
1331                    hosts.append("%s\t%s-%s %s-%d" % \
1332                            (ip_addr(base), hname, s.name, hname,
1333                                ifs[hname]))
1334                else:
1335                    ifs[hname] = 0
1336                    hosts.append("%s\t%s-%s %s-%d %s" % \
1337                            (ip_addr(base), hname, s.name, hname,
1338                                ifs[hname], hname))
1339
1340                ifs[hname] += 1
1341                base += 1
1342        return hosts, ips
1343
1344    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1345            tbparam, masters, tbmap, expid=None, expcert=None):
1346        for tb in testbeds:
1347            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1348                    expcert)
1349            allocated[tb] = 1
1350
1351    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1352            expcert=None):
1353        """
1354        Get access to testbed through fedd and set the parameters for that tb
1355        """
1356        def get_export_project(svcs):
1357            """
1358            Look through for the list of federated_service for this testbed
1359            objects for a project_export service, and extract the project
1360            parameter.
1361            """
1362
1363            pe = [s for s in svcs if s.name=='project_export']
1364            if len(pe) == 1:
1365                return pe[0].params.get('project', None)
1366            elif len(pe) == 0:
1367                return None
1368            else:
1369                raise service_error(service_error.req,
1370                        "More than one project export is not supported")
1371
1372        def add_services(svcs, type, slist, keys):
1373            """
1374            Add the given services to slist.  type is import or export.  Also
1375            add a mapping entry from the assigned id to the original service
1376            record.
1377            """
1378            for i, s in enumerate(svcs):
1379                idx = '%s%d' % (type, i)
1380                keys[idx] = s
1381                sr = {'id': idx, 'name': s.name, 'visibility': type }
1382                if s.params:
1383                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1384                            for k, v in s.params.items()]
1385                slist.append(sr)
1386
1387        uri = tbmap.get(testbed_base(tb), None)
1388        if not uri:
1389            raise service_error(service_error.server_config, 
1390                    "Unknown testbed: %s" % tb)
1391
1392        export_svcs = masters.get(tb,[])
1393        import_svcs = [ s for m in masters.values() \
1394                for s in m \
1395                    if tb in s.importers ]
1396
1397        export_project = get_export_project(export_svcs)
1398        # Compose the credential list so that IDs come before attributes
1399        creds = set()
1400        keys = set()
1401        certs = self.auth.get_creds_for_principal(fid)
1402        # Append credenials about this experiment controller - e.g. that it is
1403        # trusted.
1404        certs.update(self.auth.get_creds_for_principal(
1405            fedid(file=self.cert_file)))
1406        if expid:
1407            certs.update(self.auth.get_creds_for_principal(expid))
1408        for c in certs:
1409            keys.add(c.issuer_cert())
1410            creds.add(c.attribute_cert())
1411        creds = list(keys) + list(creds)
1412
1413        if expcert: cert, pw = expcert, None
1414        else: cert, pw = self.cert_file, self.cert_pw
1415
1416        # Request credentials
1417        req = {
1418                'abac_credential': creds,
1419            }
1420        # Make the service request from the services we're importing and
1421        # exporting.  Keep track of the export request ids so we can
1422        # collect the resulting info from the access response.
1423        e_keys = { }
1424        if import_svcs or export_svcs:
1425            slist = []
1426            add_services(import_svcs, 'import', slist, e_keys)
1427            add_services(export_svcs, 'export', slist, e_keys)
1428            req['service'] = slist
1429
1430        if self.local_access.has_key(uri):
1431            # Local access call
1432            req = { 'RequestAccessRequestBody' : req }
1433            r = self.local_access[uri].RequestAccess(req, 
1434                    fedid(file=self.cert_file))
1435            r = { 'RequestAccessResponseBody' : r }
1436        else:
1437            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1438
1439        if r.has_key('RequestAccessResponseBody'):
1440            # Through to here we have a valid response, not a fault.
1441            # Access denied is a fault, so something better or worse than
1442            # access denied has happened.
1443            r = r['RequestAccessResponseBody']
1444            self.log.debug("[get_access] Access granted")
1445        else:
1446            raise service_error(service_error.protocol,
1447                        "Bad proxy response")
1448        if 'proof' not in r:
1449            raise service_error(service_error.protocol,
1450                        "Bad access response (no access proof)")
1451
1452        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1453                tb=tb, uri=uri, proof=[r['proof']], 
1454                services=masters.get(tb, None))
1455
1456        # Collect the responses corresponding to the services this testbed
1457        # exports.  These will be the service requests that we will include in
1458        # the start segment requests (with appropriate visibility values) to
1459        # import and export the segments.
1460        for s in r.get('service', []):
1461            id = s.get('id', None)
1462            # Note that this attaches the response to the object in the masters
1463            # data structure.  (The e_keys index disappears when this fcn
1464            # returns)
1465            if id and id in e_keys:
1466                e_keys[id].reqs.append(s)
1467
1468        # Add attributes to parameter space.  We don't allow attributes to
1469        # overlay any parameters already installed.
1470        for a in r.get('fedAttr', []):
1471            try:
1472                if a['attribute']:
1473                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1474            except KeyError:
1475                self.log.error("Bad attribute in response: %s" % a)
1476
1477
1478    def split_topology(self, top, topo, testbeds):
1479        """
1480        Create the sub-topologies that are needed for experiment instantiation.
1481        """
1482        for tb in testbeds:
1483            topo[tb] = top.clone()
1484            # copy in for loop allows deletions from the original
1485            for e in [ e for e in topo[tb].elements]:
1486                etb = e.get_attribute('testbed')
1487                # NB: elements without a testbed attribute won't appear in any
1488                # sub topologies. 
1489                if not etb or etb != tb:
1490                    for i in e.interface:
1491                        for s in i.subs:
1492                            try:
1493                                s.interfaces.remove(i)
1494                            except ValueError:
1495                                raise service_error(service_error.internal,
1496                                        "Can't remove interface??")
1497                    topo[tb].elements.remove(e)
1498            topo[tb].make_indices()
1499
1500    def confirm_software(self, top):
1501        """
1502        Make sure that the software to be loaded in the topo is all available
1503        before we begin making access requests, etc.  This is a subset of
1504        wrangle_software.
1505        """
1506        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1507        pkgs.update([x.location for e in top.elements for x in e.software])
1508
1509        for pkg in pkgs:
1510            loc = pkg
1511
1512            scheme, host, path = urlparse(loc)[0:3]
1513            dest = os.path.basename(path)
1514            if not scheme:
1515                if not loc.startswith('/'):
1516                    loc = "/%s" % loc
1517                loc = "file://%s" %loc
1518            # NB: if scheme was found, loc == pkg
1519            try:
1520                u = urlopen(loc)
1521                u.close()
1522            except Exception, e:
1523                raise service_error(service_error.req, 
1524                        "Cannot open %s: %s" % (loc, e))
1525        return True
1526
1527    def wrangle_software(self, expid, top, topo, tbparams):
1528        """
1529        Copy software out to the repository directory, allocate permissions and
1530        rewrite the segment topologies to look for the software in local
1531        places.
1532        """
1533
1534        # Copy the rpms and tarfiles to a distribution directory from
1535        # which the federants can retrieve them
1536        linkpath = "%s/software" %  expid
1537        softdir ="%s/%s" % ( self.repodir, linkpath)
1538        softmap = { }
1539
1540        # self.fedkit and self.gateway kit are lists of tuples of
1541        # (install_location, download_location) this extracts the download
1542        # locations.
1543        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1544        pkgs.update([x.location for e in top.elements for x in e.software])
1545        try:
1546            os.makedirs(softdir)
1547        except EnvironmentError, e:
1548            raise service_error(
1549                    "Cannot create software directory: %s" % e)
1550        # The actual copying.  Everything's converted into a url for copying.
1551        auth_attrs = set()
1552        for pkg in pkgs:
1553            loc = pkg
1554
1555            scheme, host, path = urlparse(loc)[0:3]
1556            dest = os.path.basename(path)
1557            if not scheme:
1558                if not loc.startswith('/'):
1559                    loc = "/%s" % loc
1560                loc = "file://%s" %loc
1561            # NB: if scheme was found, loc == pkg
1562            try:
1563                u = urlopen(loc)
1564            except Exception, e:
1565                raise service_error(service_error.req, 
1566                        "Cannot open %s: %s" % (loc, e))
1567            try:
1568                f = open("%s/%s" % (softdir, dest) , "w")
1569                self.log.debug("Writing %s/%s" % (softdir,dest) )
1570                data = u.read(4096)
1571                while data:
1572                    f.write(data)
1573                    data = u.read(4096)
1574                f.close()
1575                u.close()
1576            except Exception, e:
1577                raise service_error(service_error.internal,
1578                        "Could not copy %s: %s" % (loc, e))
1579            path = re.sub("/tmp", "", linkpath)
1580            # XXX
1581            softmap[pkg] = \
1582                    "%s/%s/%s" %\
1583                    ( self.repo_url, path, dest)
1584
1585            # Allow the individual segments to access the software by assigning
1586            # an attribute to each testbed allocation that encodes the data to
1587            # be released.  This expression collects the data for each run of
1588            # the loop.
1589            auth_attrs.update([
1590                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1591                        for tb in tbparams.keys()])
1592
1593        self.append_experiment_authorization(expid, auth_attrs)
1594
1595        # Convert the software locations in the segments into the local
1596        # copies on this host
1597        for soft in [ s for tb in topo.values() \
1598                for e in tb.elements \
1599                    if getattr(e, 'software', False) \
1600                        for s in e.software ]:
1601            if softmap.has_key(soft.location):
1602                soft.location = softmap[soft.location]
1603
1604
1605    def new_experiment(self, req, fid):
1606        """
1607        The external interface to empty initial experiment creation called from
1608        the dispatcher.
1609
1610        Creates a working directory, splits the incoming description using the
1611        splitter script and parses out the avrious subsections using the
1612        lcasses above.  Once each sub-experiment is created, use pooled threads
1613        to instantiate them and start it all up.
1614        """
1615        self.log.info("New experiment call started for %s" % fid)
1616        req = req.get('NewRequestBody', None)
1617        if not req:
1618            raise service_error(service_error.req,
1619                    "Bad request format (no NewRequestBody)")
1620
1621        # import may partially succeed so always save credentials and warn
1622        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1623            self.log.debug("Failed to import delegation credentials(!)")
1624        self.auth.save()
1625
1626        try:
1627            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1628                    with_proof=True)
1629        except service_error, e:
1630            self.log.info("New experiment call for %s: access denied" % fid)
1631            raise e
1632
1633
1634        if not access_ok:
1635            self.log.info("New experiment call for %s: Access denied" % fid)
1636            raise service_error(service_error.access, "New access denied",
1637                    proof=[proof])
1638
1639        try:
1640            tmpdir = tempfile.mkdtemp(prefix="split-")
1641        except EnvironmentError:
1642            raise service_error(service_error.internal, "Cannot create tmp dir")
1643
1644        # Generate an ID for the experiment (slice) and a certificate that the
1645        # allocator can use to prove they own it.  We'll ship it back through
1646        # the encrypted connection.  If the requester supplied one, use it.
1647        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1648            expcert = req['experimentAccess']['X509']
1649            expid = fedid(certstr=expcert)
1650            self.state_lock.acquire()
1651            if expid in self.state:
1652                self.state_lock.release()
1653                raise service_error(service_error.req, 
1654                        'fedid %s identifies an existing experiment' % expid)
1655            self.state_lock.release()
1656        else:
1657            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1658
1659        #now we're done with the tmpdir, and it should be empty
1660        if self.cleanup:
1661            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1662            os.rmdir(tmpdir)
1663        else:
1664            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1665
1666        eid = self.create_experiment_state(fid, req, expid, expcert, 
1667                state='empty')
1668
1669        rv = {
1670                'experimentID': [
1671                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1672                ],
1673                'experimentStatus': 'empty',
1674                'experimentAccess': { 'X509' : expcert },
1675                'proof': proof.to_dict(),
1676            }
1677
1678        self.log.info("New experiment call succeeded for %s" % fid)
1679        return rv
1680
1681    # create_experiment sub-functions
1682
1683    @staticmethod
1684    def get_experiment_key(req, field='experimentID'):
1685        """
1686        Parse the experiment identifiers out of the request (the request body
1687        tag has been removed).  Specifically this pulls either the fedid or the
1688        localname out of the experimentID field.  A fedid is preferred.  If
1689        neither is present or the request does not contain the fields,
1690        service_errors are raised.
1691        """
1692        # Get the experiment access
1693        exp = req.get(field, None)
1694        if exp:
1695            if exp.has_key('fedid'):
1696                key = exp['fedid']
1697            elif exp.has_key('localname'):
1698                key = exp['localname']
1699            else:
1700                raise service_error(service_error.req, "Unknown lookup type")
1701        else:
1702            raise service_error(service_error.req, "No request?")
1703
1704        return key
1705
1706    def get_experiment_ids_and_start(self, key, tmpdir):
1707        """
1708        Get the experiment name, id and access certificate from the state, and
1709        set the experiment state to 'starting'.  returns a triple (fedid,
1710        localname, access_cert_file). The access_cert_file is a copy of the
1711        contents of the access certificate, created in the tempdir with
1712        restricted permissions.  If things are confused, raise an exception.
1713        """
1714
1715        expid = eid = None
1716        self.state_lock.acquire()
1717        if key in self.state:
1718            exp = self.state[key]
1719            exp.status = "starting"
1720            exp.updated()
1721            expid = exp.fedid
1722            eid = exp.localname
1723            expcert = exp.identity
1724        self.state_lock.release()
1725
1726        # make a protected copy of the access certificate so the experiment
1727        # controller can act as the experiment principal.
1728        if expcert:
1729            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1730            if not expcert_file:
1731                raise service_error(service_error.internal, 
1732                        "Cannot create temp cert file?")
1733        else:
1734            expcert_file = None
1735
1736        return (eid, expid, expcert_file)
1737
1738    def get_topology(self, req, tmpdir):
1739        """
1740        Get the ns2 content and put it into a file for parsing.  Call the local
1741        or remote parser and return the topdl.Topology.  Errors result in
1742        exceptions.  req is the request and tmpdir is a work directory.
1743        """
1744
1745        # The tcl parser needs to read a file so put the content into that file
1746        descr=req.get('experimentdescription', None)
1747        if descr:
1748            if 'ns2description' in descr:
1749                file_content=descr['ns2description']
1750            elif 'topdldescription' in descr:
1751                return topdl.Topology(**descr['topdldescription'])
1752            else:
1753                raise service_error(service_error.req, 
1754                        'Unknown experiment description type')
1755        else:
1756            raise service_error(service_error.req, "No experiment description")
1757
1758
1759        if self.splitter_url:
1760            self.log.debug("Calling remote topdl translator at %s" % \
1761                    self.splitter_url)
1762            top = self.remote_ns2topdl(self.splitter_url, file_content)
1763        else:
1764            tclfile = os.path.join(tmpdir, "experiment.tcl")
1765            if file_content:
1766                try:
1767                    f = open(tclfile, 'w')
1768                    f.write(file_content)
1769                    f.close()
1770                except EnvironmentError:
1771                    raise service_error(service_error.internal,
1772                            "Cannot write temp experiment description")
1773            else:
1774                raise service_error(service_error.req, 
1775                        "Only ns2descriptions supported")
1776            pid = "dummy"
1777            gid = "dummy"
1778            eid = "dummy"
1779
1780            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1781                str(self.muxmax), '-m', 'dummy']
1782
1783            tclcmd.extend([pid, gid, eid, tclfile])
1784
1785            self.log.debug("running local splitter %s", " ".join(tclcmd))
1786            # This is just fantastic.  As a side effect the parser copies
1787            # tb_compat.tcl into the current directory, so that directory
1788            # must be writable by the fedd user.  Doing this in the
1789            # temporary subdir ensures this is the case.
1790            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1791                    cwd=tmpdir)
1792            split_data = tclparser.stdout
1793
1794            top = topdl.topology_from_xml(file=split_data, top="experiment")
1795            os.remove(tclfile)
1796
1797        return top
1798
1799    def get_testbed_services(self, req, testbeds):
1800        """
1801        Parse the services section of the request into two dicts mapping
1802        testbed to lists of federated_service objects.  The first dict maps all
1803        exporters of services to those service objects, the second maps
1804        testbeds to service objects only for services requiring portals.
1805        """
1806        # We construct both dicts here because deriving the second is more
1807        # comples than it looks - both the keys and lists can differ, and it's
1808        # much easier to generate both in one pass.
1809        masters = { }
1810        pmasters = { }
1811        for s in req.get('service', []):
1812            # If this is a service request with the importall field
1813            # set, fill it out.
1814
1815            if s.get('importall', False):
1816                s['import'] = [ tb for tb in testbeds \
1817                        if tb not in s.get('export',[])]
1818                del s['importall']
1819
1820            # Add the service to masters
1821            for tb in s.get('export', []):
1822                if s.get('name', None):
1823
1824                    params = { }
1825                    for a in s.get('fedAttr', []):
1826                        params[a.get('attribute', '')] = a.get('value','')
1827
1828                    fser = federated_service(name=s['name'],
1829                            exporter=tb, importers=s.get('import',[]),
1830                            params=params)
1831                    if fser.name == 'hide_hosts' \
1832                            and 'hosts' not in fser.params:
1833                        fser.params['hosts'] = \
1834                                ",".join(tb_hosts.get(fser.exporter, []))
1835                    if tb in masters: masters[tb].append(fser)
1836                    else: masters[tb] = [fser]
1837
1838                    if fser.portal:
1839                        if tb in pmasters: pmasters[tb].append(fser)
1840                        else: pmasters[tb] = [fser]
1841                else:
1842                    self.log.error('Testbed service does not have name " + \
1843                            "and importers')
1844        return masters, pmasters
1845
1846    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1847        """
1848        Create the ssh keys necessary for interconnecting the portal nodes and
1849        the global hosts file for letting each segment know about the IP
1850        addresses in play.  Save these into the repo.  Add attributes to the
1851        autorizer allowing access controllers to download them and return a set
1852        of attributes that inform the segments where to find this stuff.  May
1853        raise service_errors in if there are problems.
1854        """
1855        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1856        gw_secretkey_base = "fed.%s" % self.ssh_type
1857        keydir = os.path.join(tmpdir, 'keys')
1858        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1859        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1860
1861        try:
1862            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1863        except ValueError:
1864            raise service_error(service_error.server_config, 
1865                    "Bad key type (%s)" % self.ssh_type)
1866
1867        self.generate_seer_certs(keydir)
1868
1869        # Copy configuration files into the remote file store
1870        # The config urlpath
1871        configpath = "/%s/config" % expid
1872        # The config file system location
1873        configdir ="%s%s" % ( self.repodir, configpath)
1874        try:
1875            os.makedirs(configdir)
1876        except EnvironmentError, e:
1877            raise service_error(service_error.internal,
1878                    "Cannot create config directory: %s" % e)
1879        try:
1880            f = open("%s/hosts" % configdir, "w")
1881            print >> f, string.join(hosts, '\n')
1882            f.close()
1883        except EnvironmentError, e:
1884            raise service_error(service_error.internal, 
1885                    "Cannot write hosts file: %s" % e)
1886        try:
1887            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1888            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1889            copy_file(os.path.join(keydir, 'ca.pem'), 
1890                    os.path.join(configdir, 'ca.pem'))
1891            copy_file(os.path.join(keydir, 'node.pem'), 
1892                    os.path.join(configdir, 'node.pem'))
1893        except EnvironmentError, e:
1894            raise service_error(service_error.internal, 
1895                    "Cannot copy keyfiles: %s" % e)
1896
1897        # Allow the individual testbeds to access the configuration files,
1898        # again by setting an attribute for the relevant pathnames on each
1899        # allocation principal.  Yeah, that's a long list comprehension.
1900        self.append_experiment_authorization(expid, set([
1901            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1902                    for tb in tbparams.keys() \
1903                        for f in ("hosts", 'ca.pem', 'node.pem', 
1904                            gw_secretkey_base, gw_pubkey_base)]))
1905
1906        attrs = [ 
1907                {
1908                    'attribute': 'ssh_pubkey', 
1909                    'value': '%s/%s/config/%s' % \
1910                            (self.repo_url, expid, gw_pubkey_base)
1911                },
1912                {
1913                    'attribute': 'ssh_secretkey', 
1914                    'value': '%s/%s/config/%s' % \
1915                            (self.repo_url, expid, gw_secretkey_base)
1916                },
1917                {
1918                    'attribute': 'hosts', 
1919                    'value': '%s/%s/config/hosts' % \
1920                            (self.repo_url, expid)
1921                },
1922                {
1923                    'attribute': 'seer_ca_pem', 
1924                    'value': '%s/%s/config/%s' % \
1925                            (self.repo_url, expid, 'ca.pem')
1926                },
1927                {
1928                    'attribute': 'seer_node_pem', 
1929                    'value': '%s/%s/config/%s' % \
1930                            (self.repo_url, expid, 'node.pem')
1931                },
1932            ]
1933        return attrs
1934
1935
1936    def get_vtopo(self, req, fid):
1937        """
1938        Return the stored virtual topology for this experiment
1939        """
1940        rv = None
1941        state = None
1942        self.log.info("vtopo call started for %s" %  fid)
1943
1944        req = req.get('VtopoRequestBody', None)
1945        if not req:
1946            raise service_error(service_error.req,
1947                    "Bad request format (no VtopoRequestBody)")
1948        exp = req.get('experiment', None)
1949        if exp:
1950            if exp.has_key('fedid'):
1951                key = exp['fedid']
1952                keytype = "fedid"
1953            elif exp.has_key('localname'):
1954                key = exp['localname']
1955                keytype = "localname"
1956            else:
1957                raise service_error(service_error.req, "Unknown lookup type")
1958        else:
1959            raise service_error(service_error.req, "No request?")
1960
1961        try:
1962            proof = self.check_experiment_access(fid, key)
1963        except service_error, e:
1964            self.log.info("vtopo call failed for %s: access denied" %  fid)
1965            raise e
1966
1967        self.state_lock.acquire()
1968        # XXX: this needs to be recalculated
1969        if key in self.state:
1970            if self.state[key].top is not None:
1971                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1972                rv = { 'experiment' : {keytype: key },
1973                        'vtopo': vtopo,
1974                        'proof': proof.to_dict(), 
1975                    }
1976            else:
1977                state = self.state[key].status
1978        self.state_lock.release()
1979
1980        if rv: 
1981            self.log.info("vtopo call completed for %s %s " % \
1982                (key, fid))
1983            return rv
1984        else: 
1985            if state:
1986                self.log.info("vtopo call completed for %s %s (Not ready)" % \
1987                    (key, fid))
1988                raise service_error(service_error.partial, 
1989                        "Not ready: %s" % state)
1990            else:
1991                self.log.info("vtopo call completed for %s %s (No experiment)"\
1992                        % (key, fid))
1993                raise service_error(service_error.req, "No such experiment")
1994
1995    def get_vis(self, req, fid):
1996        """
1997        Return the stored visualization for this experiment
1998        """
1999        rv = None
2000        state = None
2001
2002        self.log.info("vis call started for %s" %  fid)
2003        req = req.get('VisRequestBody', None)
2004        if not req:
2005            raise service_error(service_error.req,
2006                    "Bad request format (no VisRequestBody)")
2007        exp = req.get('experiment', None)
2008        if exp:
2009            if exp.has_key('fedid'):
2010                key = exp['fedid']
2011                keytype = "fedid"
2012            elif exp.has_key('localname'):
2013                key = exp['localname']
2014                keytype = "localname"
2015            else:
2016                raise service_error(service_error.req, "Unknown lookup type")
2017        else:
2018            raise service_error(service_error.req, "No request?")
2019
2020        try:
2021            proof = self.check_experiment_access(fid, key)
2022        except service_error, e:
2023            self.log.info("vis call failed for %s: access denied" %  fid)
2024            raise e
2025
2026        self.state_lock.acquire()
2027        # Generate the visualization
2028        if key in self.state:
2029            if self.state[key].top is not None:
2030                try:
2031                    vis = self.genviz(
2032                            topdl.topology_to_vtopo(self.state[key].top))
2033                except service_error, e:
2034                    self.state_lock.release()
2035                    raise e
2036                rv =  { 'experiment' : {keytype: key },
2037                        'vis': vis,
2038                        'proof': proof.to_dict(), 
2039                        }
2040            else:
2041                state = self.state[key].status
2042        self.state_lock.release()
2043
2044        if rv: 
2045            self.log.info("vis call completed for %s %s " % \
2046                (key, fid))
2047            return rv
2048        else:
2049            if state:
2050                self.log.info("vis call completed for %s %s (not ready)" % \
2051                    (key, fid))
2052                raise service_error(service_error.partial, 
2053                        "Not ready: %s" % state)
2054            else:
2055                self.log.info("vis call completed for %s %s (no experiment)" % \
2056                    (key, fid))
2057                raise service_error(service_error.req, "No such experiment")
2058
2059   
2060    def save_federant_information(self, allocated, tbparams, eid, top):
2061        """
2062        Store the various data that have changed in the experiment state
2063        between when it was started and the beginning of resource allocation.
2064        This is basically the information about each local allocation.  This
2065        fills in the values of the placeholder allocation in the state.  It
2066        also collects the access proofs and returns them as dicts for a
2067        response message.
2068        """
2069        self.state_lock.acquire()
2070        exp = self.state[eid]
2071        exp.top = top.clone()
2072        # save federant information
2073        for k in allocated.keys():
2074            exp.add_allocation(tbparams[k])
2075            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2076                type="testbed", localname=[k], 
2077                service=[ s.to_topdl() for s in tbparams[k].services]))
2078
2079        # Access proofs for the response message
2080        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2081                    for p in tbparams[k].proof]
2082        exp.updated()
2083        if self.state_filename: 
2084            self.write_state()
2085        self.state_lock.release()
2086        return proofs
2087
2088    def clear_placeholder(self, eid, expid, tmpdir):
2089        """
2090        Clear the placeholder and remove any allocated temporary dir.
2091        """
2092
2093        self.state_lock.acquire()
2094        del self.state[eid]
2095        del self.state[expid]
2096        if self.state_filename: self.write_state()
2097        self.state_lock.release()
2098        if tmpdir and self.cleanup:
2099            self.remove_dirs(tmpdir)
2100
2101    # end of create_experiment sub-functions
2102
2103    def create_experiment(self, req, fid):
2104        """
2105        The external interface to experiment creation called from the
2106        dispatcher.
2107
2108        Creates a working directory, splits the incoming description using the
2109        splitter script and parses out the various subsections using the
2110        classes above.  Once each sub-experiment is created, use pooled threads
2111        to instantiate them and start it all up.
2112        """
2113
2114        self.log.info("Create experiment call started for %s" % fid)
2115        req = req.get('CreateRequestBody', None)
2116        if req:
2117            key = self.get_experiment_key(req)
2118        else:
2119            raise service_error(service_error.req,
2120                    "Bad request format (no CreateRequestBody)")
2121
2122        # Import information from the requester
2123        # import may partially succeed so always save credentials and warn
2124        if not self.auth.import_credentials(data_list=req.get('credential', [])):
2125            self.log.debug("Failed to import delegation credentials(!)")
2126        self.auth.save()
2127
2128        try:
2129            # Make sure that the caller can talk to us
2130            proof = self.check_experiment_access(fid, key)
2131        except service_error, e:
2132            self.log.info("Create experiment call failed for %s: access denied"\
2133                    % fid)
2134            raise e
2135
2136
2137        # Install the testbed map entries supplied with the request into a copy
2138        # of the testbed map.
2139        tbmap = dict(self.tbmap)
2140        tbactive = set(self.tbactive)
2141        for m in req.get('testbedmap', []):
2142            if 'testbed' in m and 'uri' in m:
2143                tbmap[m['testbed']] = m['uri']
2144                if 'active' in m and m['active']: tbactive.add(m['testbed'])
2145
2146        # a place to work
2147        try:
2148            tmpdir = tempfile.mkdtemp(prefix="split-")
2149            os.mkdir(tmpdir+"/keys")
2150        except EnvironmentError:
2151            raise service_error(service_error.internal, "Cannot create tmp dir")
2152
2153        tbparams = { }
2154
2155        eid, expid, expcert_file = \
2156                self.get_experiment_ids_and_start(key, tmpdir)
2157
2158        # This catches exceptions to clear the placeholder if necessary
2159        try: 
2160            if not (eid and expid):
2161                raise service_error(service_error.internal, 
2162                        "Cannot find local experiment info!?")
2163
2164            top = self.get_topology(req, tmpdir)
2165            self.confirm_software(top)
2166            # Assign the IPs
2167            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2168            # Find the testbeds to look up
2169            tb_hosts = { }
2170            testbeds = [ ]
2171            for e in top.elements:
2172                if isinstance(e, topdl.Computer):
2173                    tb = e.get_attribute('testbed') or 'default'
2174                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2175                    else: 
2176                        tb_hosts[tb] = [ e.name ]
2177                        testbeds.append(tb)
2178
2179            masters, pmasters = self.get_testbed_services(req, testbeds)
2180            allocated = { }         # Testbeds we can access
2181            topo ={ }               # Sub topologies
2182            connInfo = { }          # Connection information
2183
2184            self.split_topology(top, topo, testbeds)
2185
2186            self.get_access_to_testbeds(testbeds, fid, allocated, 
2187                    tbparams, masters, tbmap, expid, expcert_file)
2188
2189            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2190
2191            part = experiment_partition(self.auth, self.store_url, tbmap,
2192                    self.muxmax, self.direct_transit, tbactive)
2193            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2194                    connInfo, expid)
2195
2196            auth_attrs = set()
2197            # Now get access to the dynamic testbeds (those added above)
2198            for tb in [ t for t in topo if t not in allocated]:
2199                self.get_access(tb, tbparams, fid, masters, tbmap, 
2200                        expid, expcert_file)
2201                allocated[tb] = 1
2202                store_keys = topo[tb].get_attribute('store_keys')
2203                # Give the testbed access to keys it exports or imports
2204                if store_keys:
2205                    auth_attrs.update(set([
2206                        (tbparams[tb].allocID, sk) \
2207                                for sk in store_keys.split(" ")]))
2208
2209            if auth_attrs:
2210                self.append_experiment_authorization(expid, auth_attrs)
2211
2212            # transit and disconnected testbeds may not have a connInfo entry.
2213            # Fill in the blanks.
2214            for t in allocated.keys():
2215                if not connInfo.has_key(t):
2216                    connInfo[t] = { }
2217
2218            self.wrangle_software(expid, top, topo, tbparams)
2219
2220            proofs = self.save_federant_information(allocated, tbparams, 
2221                    eid, top)
2222        except service_error, e:
2223            # If something goes wrong in the parse (usually an access error)
2224            # clear the placeholder state.  From here on out the code delays
2225            # exceptions.  Failing at this point returns a fault to the remote
2226            # caller.
2227
2228            self.log.info("Create experiment call failed for %s %s: %s" % 
2229                    (eid, fid, e))
2230            self.clear_placeholder(eid, expid, tmpdir)
2231            raise e
2232
2233        # Start the background swapper and return the starting state.  From
2234        # here on out, the state will stick around a while.
2235
2236        # Create a logger that logs to the experiment's state object as well as
2237        # to the main log file.
2238        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2239        alloc_collector = self.list_log(self.state[eid].log)
2240        h = logging.StreamHandler(alloc_collector)
2241        # XXX: there should be a global one of these rather than repeating the
2242        # code.
2243        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2244                    '%d %b %y %H:%M:%S'))
2245        alloc_log.addHandler(h)
2246
2247        # Start a thread to do the resource allocation
2248        t  = Thread(target=self.allocate_resources,
2249                args=(allocated, masters, eid, expid, tbparams, 
2250                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2251                    connInfo, tbmap, expcert_file),
2252                name=eid)
2253        t.start()
2254
2255        rv = {
2256                'experimentID': [
2257                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2258                ],
2259                'experimentStatus': 'starting',
2260                'proof': [ proof.to_dict() ] + proofs,
2261            }
2262        self.log.info("Create experiment call succeeded for %s %s" % \
2263                (eid, fid))
2264
2265        return rv
2266   
2267    def get_experiment_fedid(self, key):
2268        """
2269        find the fedid associated with the localname key in the state database.
2270        """
2271
2272        rv = None
2273        self.state_lock.acquire()
2274        if key in self.state:
2275            rv = self.state[key].fedid
2276        self.state_lock.release()
2277        return rv
2278
2279    def check_experiment_access(self, fid, key):
2280        """
2281        Confirm that the fid has access to the experiment.  Though a request
2282        may be made in terms of a local name, the access attribute is always
2283        the experiment's fedid.
2284        """
2285        if not isinstance(key, fedid):
2286            key = self.get_experiment_fedid(key)
2287
2288        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2289
2290        if access_ok:
2291            return proof
2292        else:
2293            raise service_error(service_error.access, "Access Denied",
2294                proof)
2295
2296
2297    def get_handler(self, path, fid):
2298        """
2299        Perhaps surprisingly named, this function handles HTTP GET requests to
2300        this server (SOAP requests are POSTs).
2301        """
2302        self.log.info("Get handler %s %s" % (path, fid))
2303        # XXX: log proofs?
2304        if self.auth.check_attribute(fid, path):
2305            return ("%s/%s" % (self.repodir, path), "application/binary")
2306        else:
2307            return (None, None)
2308
2309    def update_info(self, key, force=False):
2310        top = None
2311        self.state_lock.acquire()
2312        if key in self.state:
2313            if force or self.state[key].older_than(self.info_cache_limit):
2314                top = self.state[key].top
2315                if top is not None: top = top.clone()
2316                d1, info_params, cert, d2 = \
2317                        self.get_segment_info(self.state[key], need_lock=False)
2318        self.state_lock.release()
2319
2320        if top is None: return
2321
2322        try:
2323            tmpdir = tempfile.mkdtemp(prefix="info-")
2324        except EnvironmentError:
2325            raise service_error(service_error.internal, 
2326                    "Cannot create tmp dir")
2327        cert_file = self.make_temp_certfile(cert, tmpdir)
2328
2329        data = []
2330        try:
2331            for k, (uri, aid) in info_params.items():
2332                info=self.info_segment(log=self.log, testbed=uri,
2333                            cert_file=cert_file, cert_pwd=None,
2334                            trusted_certs=self.trusted_certs,
2335                            caller=self.call_InfoSegment)
2336                info(uri, aid)
2337                data.append(info)
2338        # Clean up the tmpdir no matter what
2339        finally:
2340            if tmpdir: self.remove_dirs(tmpdir)
2341
2342        self.annotate_topology(top, data)
2343        self.state_lock.acquire()
2344        if key in self.state:
2345            self.state[key].top = top
2346            self.state[key].updated()
2347            if self.state_filename: self.write_state()
2348        self.state_lock.release()
2349
2350   
2351    def get_info(self, req, fid):
2352        """
2353        Return all the stored info about this experiment
2354        """
2355        rv = None
2356
2357        self.log.info("Info call started for %s" %  fid)
2358        req = req.get('InfoRequestBody', None)
2359        if not req:
2360            raise service_error(service_error.req,
2361                    "Bad request format (no InfoRequestBody)")
2362        exp = req.get('experiment', None)
2363        legacy = req.get('legacy', False)
2364        fresh = req.get('fresh', False)
2365        if exp:
2366            if exp.has_key('fedid'):
2367                key = exp['fedid']
2368                keytype = "fedid"
2369            elif exp.has_key('localname'):
2370                key = exp['localname']
2371                keytype = "localname"
2372            else:
2373                raise service_error(service_error.req, "Unknown lookup type")
2374        else:
2375            raise service_error(service_error.req, "No request?")
2376
2377        try:
2378            proof = self.check_experiment_access(fid, key)
2379        except service_error, e:
2380            self.log.info("Info call failed for %s: access denied" %  fid)
2381
2382
2383        self.update_info(key, fresh)
2384
2385        self.state_lock.acquire()
2386        if self.state.has_key(key):
2387            rv = self.state[key].get_info()
2388            # Copy the topo if we need legacy annotations
2389            if legacy:
2390                top = self.state[key].top
2391                if top is not None: top = top.clone()
2392        self.state_lock.release()
2393        self.log.info("Gathered Info for %s %s" % (key, fid))
2394
2395        # If the legacy visualization and topology representations are
2396        # requested, calculate them and add them to the return.
2397        if legacy and rv is not None:
2398            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2399            if top is not None:
2400                vtopo = topdl.topology_to_vtopo(top)
2401                if vtopo is not None:
2402                    rv['vtopo'] = vtopo
2403                    try:
2404                        vis = self.genviz(vtopo)
2405                    except service_error, e:
2406                        self.log.debug('Problem generating visualization: %s' \
2407                                % e)
2408                        vis = None
2409                    if vis is not None:
2410                        rv['vis'] = vis
2411        if rv:
2412            self.log.info("Info succeded for %s %s" % (key, fid))
2413            rv['proof'] = proof.to_dict()
2414            return rv
2415        else: 
2416            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2417            raise service_error(service_error.req, "No such experiment")
2418
2419    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2420            results):
2421        """
2422        Call OperateSegment on multiple testbeds and gather the results.
2423        op_params contains the parameters needed to contact that testbed, cert
2424        is a certificate containing the fedid to use, op is the operation,
2425        testbeds is a dict mapping testbed name to targets in that testbed,
2426        params are the parameters to include a,d results is a growing list of
2427        the results of the calls.
2428        """
2429        try:
2430            tmpdir = tempfile.mkdtemp(prefix="info-")
2431        except EnvironmentError:
2432            raise service_error(service_error.internal, 
2433                    "Cannot create tmp dir")
2434        cert_file = self.make_temp_certfile(cert, tmpdir)
2435
2436        try:
2437            for tb, targets in testbeds.items():
2438                if tb in op_params:
2439                    uri, aid = op_params[tb]
2440                    operate=self.operation_segment(log=self.log, testbed=uri,
2441                                cert_file=cert_file, cert_pwd=None,
2442                                trusted_certs=self.trusted_certs,
2443                                caller=self.call_OperationSegment)
2444                    if operate(uri, aid, op, targets, params):
2445                        if operate.status is not None:
2446                            results.extend(operate.status)
2447                            continue
2448                # Something went wrong in a weird way.  Add statuses
2449                # that reflect that to results
2450                for t in targets:
2451                    results.append(operation_status(t, 
2452                        operation_status.federant,
2453                        'Unexpected error on %s' % tb))
2454        # Clean up the tmpdir no matter what
2455        finally:
2456            if tmpdir: self.remove_dirs(tmpdir)
2457
2458    def do_operation(self, req, fid):
2459        """
2460        Find the testbeds holding each target and ask them to carry out the
2461        operation.  Return the statuses.
2462        """
2463        # Map an element to the testbed containing it
2464        def element_to_tb(e):
2465            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2466            elif isinstance(e, topdl.Testbed): return e.name
2467            else: return None
2468        # If d is an operation_status object, make it a dict
2469        def make_dict(d):
2470            if isinstance(d, dict): return d
2471            elif isinstance(d, operation_status): return d.to_dict()
2472            else: return { }
2473
2474        def element_name(e):
2475            if isinstance(e, topdl.Computer): return e.name
2476            elif isinstance(e, topdl.Testbed): 
2477                if e.localname: return e.localname[0]
2478                else: return None
2479            else: return None
2480
2481        self.log.info("Operation call started for %s" %  fid)
2482        req = req.get('OperationRequestBody', None)
2483        if not req:
2484            raise service_error(service_error.req,
2485                    "Bad request format (no OperationRequestBody)")
2486        exp = req.get('experiment', None)
2487        op = req.get('operation', None)
2488        targets = set(req.get('target', []))
2489        params = req.get('parameter', None)
2490
2491        if exp:
2492            if 'fedid' in exp:
2493                key = exp['fedid']
2494                keytype = "fedid"
2495            elif 'localname' in exp:
2496                key = exp['localname']
2497                keytype = "localname"
2498            else:
2499                raise service_error(service_error.req, "Unknown lookup type")
2500        else:
2501            raise service_error(service_error.req, "No request?")
2502
2503        if op is None or not targets:
2504            raise service_error(service_error.req, "No request?")
2505
2506        try:
2507            proof = self.check_experiment_access(fid, key)
2508        except service_error, e:
2509            self.log.info("Operation call failed for %s: access denied" %  fid)
2510            raise e
2511
2512        self.state_lock.acquire()
2513        if key in self.state:
2514            d1, op_params, cert, d2 = \
2515                    self.get_segment_info(self.state[key], need_lock=False,
2516                            key='tb')
2517            top = self.state[key].top
2518            if top is not None:
2519                top = top.clone()
2520        self.state_lock.release()
2521
2522        if top is None:
2523            self.log.info("Operation call failed for %s: not active" %  fid)
2524            raise service_error(service_error.partial, "No topology yet", 
2525                    proof=proof)
2526
2527        testbeds = { }
2528        results = []
2529        for e in top.elements:
2530            ename = element_name(e)
2531            if ename in targets:
2532                tb = element_to_tb(e)
2533                targets.remove(ename)
2534                if tb is not None:
2535                    if tb in testbeds: testbeds[tb].append(ename)
2536                    else: testbeds[tb] = [ ename ]
2537                else:
2538                    results.append(operation_status(e.name, 
2539                        code=operation_status.no_target, 
2540                        description='Cannot map target to testbed'))
2541
2542        for t in targets:
2543            results.append(operation_status(t, operation_status.no_target))
2544
2545        self.operate_on_segments(op_params, cert, op, testbeds, params,
2546                results)
2547
2548        self.log.info("Operation call succeeded for %s" %  fid)
2549        return { 
2550                'experiment': exp, 
2551                'status': [make_dict(r) for r in results],
2552                'proof': proof.to_dict()
2553                }
2554
2555
2556    def get_multi_info(self, req, fid):
2557        """
2558        Return all the stored info that this fedid can access
2559        """
2560        rv = { 'info': [ ], 'proof': [ ] }
2561
2562        self.log.info("Multi Info call started for %s" %  fid)
2563        self.state_lock.acquire()
2564        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2565            try:
2566                proof = self.check_experiment_access(fid, key)
2567            except service_error, e:
2568                if e.code == service_error.access:
2569                    continue
2570                else:
2571                    self.log.info("Multi Info call failed for %s: %s" %  \
2572                            (e,fid))
2573                    self.state_lock.release()
2574                    raise e
2575
2576            if self.state.has_key(key):
2577                e = self.state[key].get_info()
2578                e['proof'] = proof.to_dict()
2579                rv['info'].append(e)
2580                rv['proof'].append(proof.to_dict())
2581        self.state_lock.release()
2582        self.log.info("Multi Info call succeeded for %s" %  fid)
2583        return rv
2584
2585    def check_termination_status(self, fed_exp, force):
2586        """
2587        Confirm that the experiment is sin a valid state to stop (or force it)
2588        return the state - invalid states for deletion and force settings cause
2589        exceptions.
2590        """
2591        self.state_lock.acquire()
2592        status = fed_exp.status
2593
2594        if status:
2595            if status in ('starting', 'terminating'):
2596                if not force:
2597                    self.state_lock.release()
2598                    raise service_error(service_error.partial, 
2599                            'Experiment still being created or destroyed')
2600                else:
2601                    self.log.warning('Experiment in %s state ' % status + \
2602                            'being terminated by force.')
2603            self.state_lock.release()
2604            return status
2605        else:
2606            # No status??? trouble
2607            self.state_lock.release()
2608            raise service_error(service_error.internal,
2609                    "Experiment has no status!?")
2610
2611    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2612        ids = []
2613        term_params = { }
2614        if need_lock: self.state_lock.acquire()
2615        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2616        expcert = fed_exp.identity
2617        repo = "%s" % fed_exp.fedid
2618
2619        # Collect the allocation/segment ids into a dict keyed by the fedid
2620        # of the allocation that contains a tuple of uri, aid
2621        for i, fed in enumerate(fed_exp.get_all_allocations()):
2622            uri = fed.uri
2623            aid = fed.allocID
2624            if key == 'aid': term_params[aid] = (uri, aid)
2625            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2626
2627        if need_lock: self.state_lock.release()
2628        return ids, term_params, expcert, repo
2629
2630
2631    def get_termination_info(self, fed_exp):
2632        self.state_lock.acquire()
2633        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2634        # Change the experiment state
2635        fed_exp.status = 'terminating'
2636        fed_exp.updated()
2637        if self.state_filename: self.write_state()
2638        self.state_lock.release()
2639
2640        return ids, term_params, expcert, repo
2641
2642
2643    def deallocate_resources(self, term_params, expcert, status, force, 
2644            dealloc_log):
2645        tmpdir = None
2646        # This try block makes sure the tempdir is cleared
2647        try:
2648            # If no expcert, try the deallocation as the experiment
2649            # controller instance.
2650            if expcert and self.auth_type != 'legacy': 
2651                try:
2652                    tmpdir = tempfile.mkdtemp(prefix="term-")
2653                except EnvironmentError:
2654                    raise service_error(service_error.internal, 
2655                            "Cannot create tmp dir")
2656                cert_file = self.make_temp_certfile(expcert, tmpdir)
2657                pw = None
2658            else: 
2659                cert_file = self.cert_file
2660                pw = self.cert_pwd
2661
2662            # Stop everyone.  NB, wait_for_all waits until a thread starts
2663            # and then completes, so we can't wait if nothing starts.  So,
2664            # no tbparams, no start.
2665            if len(term_params) > 0:
2666                tp = thread_pool(self.nthreads)
2667                for k, (uri, aid) in term_params.items():
2668                    # Create and start a thread to stop the segment
2669                    tp.wait_for_slot()
2670                    t  = pooled_thread(\
2671                            target=self.terminate_segment(log=dealloc_log,
2672                                testbed=uri,
2673                                cert_file=cert_file, 
2674                                cert_pwd=pw,
2675                                trusted_certs=self.trusted_certs,
2676                                caller=self.call_TerminateSegment),
2677                            args=(uri, aid), name=k,
2678                            pdata=tp, trace_file=self.trace_file)
2679                    t.start()
2680                # Wait for completions
2681                tp.wait_for_all_done()
2682
2683            # release the allocations (failed experiments have done this
2684            # already, and starting experiments may be in odd states, so we
2685            # ignore errors releasing those allocations
2686            try: 
2687                for k, (uri, aid)  in term_params.items():
2688                    self.release_access(None, aid, uri=uri,
2689                            cert_file=cert_file, cert_pwd=pw)
2690            except service_error, e:
2691                if status != 'failed' and not force:
2692                    raise e
2693
2694        # Clean up the tmpdir no matter what
2695        finally:
2696            if tmpdir: self.remove_dirs(tmpdir)
2697
2698    def terminate_experiment(self, req, fid):
2699        """
2700        Swap this experiment out on the federants and delete the shared
2701        information
2702        """
2703        self.log.info("Terminate experiment call started for %s" % fid)
2704        tbparams = { }
2705        req = req.get('TerminateRequestBody', None)
2706        if not req:
2707            raise service_error(service_error.req,
2708                    "Bad request format (no TerminateRequestBody)")
2709
2710        key = self.get_experiment_key(req, 'experiment')
2711        try:
2712            proof = self.check_experiment_access(fid, key)
2713        except service_error, e:
2714            self.log.info(
2715                    "Terminate experiment call failed for %s: access denied" \
2716                            % fid)
2717            raise e
2718        exp = req.get('experiment', False)
2719        force = req.get('force', False)
2720
2721        dealloc_list = [ ]
2722
2723
2724        # Create a logger that logs to the dealloc_list as well as to the main
2725        # log file.
2726        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2727        dealloc_log.info("Terminating %s " %key)
2728        h = logging.StreamHandler(self.list_log(dealloc_list))
2729        # XXX: there should be a global one of these rather than repeating the
2730        # code.
2731        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2732                    '%d %b %y %H:%M:%S'))
2733        dealloc_log.addHandler(h)
2734
2735        self.state_lock.acquire()
2736        fed_exp = self.state.get(key, None)
2737        self.state_lock.release()
2738        repo = None
2739
2740        if fed_exp:
2741            status = self.check_termination_status(fed_exp, force)
2742            # get_termination_info updates the experiment state
2743            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2744            self.deallocate_resources(term_params, expcert, status, force, 
2745                    dealloc_log)
2746
2747            # Remove the terminated experiment
2748            self.state_lock.acquire()
2749            for id in ids:
2750                self.clear_experiment_authorization(id, need_state_lock=False)
2751                if id in self.state: del self.state[id]
2752
2753            if self.state_filename: self.write_state()
2754            self.state_lock.release()
2755
2756            # Delete any synch points associated with this experiment.  All
2757            # synch points begin with the fedid of the experiment.
2758            fedid_keys = set(["fedid:%s" % f for f in ids \
2759                    if isinstance(f, fedid)])
2760            for k in self.synch_store.all_keys():
2761                try:
2762                    if len(k) > 45 and k[0:46] in fedid_keys:
2763                        self.synch_store.del_value(k)
2764                except synch_store.BadDeletionError:
2765                    pass
2766            self.write_store()
2767
2768            # Remove software and other cached stuff from the filesystem.
2769            if repo:
2770                self.remove_dirs("%s/%s" % (self.repodir, repo))
2771       
2772            self.log.info("Terminate experiment succeeded for %s %s" % \
2773                    (key, fid))
2774            return { 
2775                    'experiment': exp , 
2776                    'deallocationLog': string.join(dealloc_list, ''),
2777                    'proof': [proof.to_dict()],
2778                    }
2779        else:
2780            self.log.info("Terminate experiment failed for %s %s: no state" % \
2781                    (key, fid))
2782            raise service_error(service_error.req, "No saved state")
2783
2784
2785    def GetValue(self, req, fid):
2786        """
2787        Get a value from the synchronized store
2788        """
2789        req = req.get('GetValueRequestBody', None)
2790        if not req:
2791            raise service_error(service_error.req,
2792                    "Bad request format (no GetValueRequestBody)")
2793       
2794        name = req.get('name', None)
2795        wait = req.get('wait', False)
2796        rv = { 'name': name }
2797
2798        if not name:
2799            raise service_error(service_error.req, "No name?")
2800
2801        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2802
2803        if access_ok:
2804            self.log.debug("[GetValue] asking for %s " % name)
2805            try:
2806                v = self.synch_store.get_value(name, wait)
2807            except synch_store.RevokedKeyError:
2808                # No more synch on this key
2809                raise service_error(service_error.federant, 
2810                        "Synch key %s revoked" % name)
2811            if v is not None:
2812                rv['value'] = v
2813            rv['proof'] = proof.to_dict()
2814            self.log.debug("[GetValue] got %s from %s" % (v, name))
2815            return rv
2816        else:
2817            raise service_error(service_error.access, "Access Denied",
2818                    proof=proof)
2819       
2820
2821    def SetValue(self, req, fid):
2822        """
2823        Set a value in the synchronized store
2824        """
2825        req = req.get('SetValueRequestBody', None)
2826        if not req:
2827            raise service_error(service_error.req,
2828                    "Bad request format (no SetValueRequestBody)")
2829       
2830        name = req.get('name', None)
2831        v = req.get('value', '')
2832
2833        if not name:
2834            raise service_error(service_error.req, "No name?")
2835
2836        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2837
2838        if access_ok:
2839            try:
2840                self.synch_store.set_value(name, v)
2841                self.write_store()
2842                self.log.debug("[SetValue] set %s to %s" % (name, v))
2843            except synch_store.CollisionError:
2844                # Translate into a service_error
2845                raise service_error(service_error.req,
2846                        "Value already set: %s" %name)
2847            except synch_store.RevokedKeyError:
2848                # No more synch on this key
2849                raise service_error(service_error.federant, 
2850                        "Synch key %s revoked" % name)
2851                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2852        else:
2853            raise service_error(service_error.access, "Access Denied",
2854                    proof=proof)
Note: See TracBrowser for help on using the repository browser.