source: fedd/federation/experiment_control.py @ 4708875

compt_changes
Last change on this file since 4708875 was 4708875, checked in by Ted Faber <faber@…>, 12 years ago

Deal with grouper errors

  • Property mode set to 100644
File size: 94.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15import shutil
16import zipfile
17
18import os.path
19
20import traceback
21# For parsing visualization output and splitter output
22import xml.parsers.expat
23
24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
26from string import join
27
28from urlparse import urlparse
29from urllib2 import urlopen, URLError
30
31from util import *
32from deter import fedid, generate_fedid
33from remote_service import xmlrpc_handler, soap_handler, service_caller
34from service_error import service_error
35from synch_store import synch_store
36from experiment_partition import experiment_partition
37from experiment_control_legacy import experiment_control_legacy
38from authorizer import abac_authorizer
39from thread_pool import thread_pool, pooled_thread
40from experiment_info import experiment_info, allocation_info, federated_service
41from operation_status import operation_status
42
43from deter import topdl
44from deter import ip_allocator
45from deter import ip_addr
46import list_log
47
48
49class nullHandler(logging.Handler):
50    def emit(self, record): pass
51
52fl = logging.getLogger("fedd.experiment_control")
53fl.addHandler(nullHandler())
54
55class experiment_control_local(experiment_control_legacy):
56    """
57    Control of experiments that this system can directly access.
58
59    Includes experiment creation, termination and information dissemination.
60    Thred safe.
61    """
62
63    class ssh_cmd_timeout(RuntimeError): pass
64   
65    call_RequestAccess = service_caller('RequestAccess')
66    call_ReleaseAccess = service_caller('ReleaseAccess')
67    call_StartSegment = service_caller('StartSegment')
68    call_TerminateSegment = service_caller('TerminateSegment')
69    call_InfoSegment = service_caller('InfoSegment')
70    call_OperationSegment = service_caller('OperationSegment')
71    call_Ns2Topdl = service_caller('Ns2Topdl')
72
73    def __init__(self, config=None, auth=None):
74        """
75        Intialize the various attributes, most from the config object
76        """
77
78        def parse_tarfile_list(tf):
79            """
80            Parse a tarfile list from the configuration.  This is a set of
81            paths and tarfiles separated by spaces.
82            """
83            rv = [ ]
84            if tf is not None:
85                tl = tf.split()
86                while len(tl) > 1:
87                    p, t = tl[0:2]
88                    del tl[0:2]
89                    rv.append((p, t))
90            return rv
91
92        self.list_log = list_log.list_log
93
94        self.cert_file = config.get("experiment_control", "cert_file")
95        if self.cert_file:
96            self.cert_pwd = config.get("experiment_control", "cert_pwd")
97        else:
98            self.cert_file = config.get("globals", "cert_file")
99            self.cert_pwd = config.get("globals", "cert_pwd")
100
101        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
102                or config.get("globals", "trusted_certs")
103
104        self.repodir = config.get("experiment_control", "repodir")
105        self.repo_url = config.get("experiment_control", "repo_url", 
106                "https://users.isi.deterlab.net:23235");
107
108        self.exp_stem = "fed-stem"
109        self.log = logging.getLogger("fedd.experiment_control")
110        set_log_level(config, "experiment_control", self.log)
111        self.muxmax = 2
112        self.nthreads = 10
113        self.randomize_experiments = False
114
115        self.splitter = None
116        self.ssh_keygen = "/usr/bin/ssh-keygen"
117        self.ssh_identity_file = None
118
119
120        self.debug = config.getboolean("experiment_control", "create_debug")
121        self.cleanup = not config.getboolean("experiment_control", 
122                "leave_tmpfiles")
123        self.state_filename = config.get("experiment_control", 
124                "experiment_state")
125        self.store_filename = config.get("experiment_control", 
126                "synch_store")
127        self.store_url = config.get("experiment_control", "store_url")
128        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
129        self.grouper_url = config.get("experiment_control", "grouper_url")
130        self.fedkit = parse_tarfile_list(\
131                config.get("experiment_control", "fedkit"))
132        self.gatewaykit = parse_tarfile_list(\
133                config.get("experiment_control", "gatewaykit"))
134
135        dt = config.get("experiment_control", "direct_transit")
136        self.auth_type = config.get('experiment_control', 'auth_type') \
137                or 'legacy'
138        self.auth_dir = config.get('experiment_control', 'auth_dir')
139        # XXX: document this!
140        self.info_cache_limit = \
141                config.getint('experiment_control', 'info_cache', 600)
142        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
143        else: self.direct_transit = [ ]
144        # NB for internal master/slave ops, not experiment setup
145        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
146
147        self.overrides = set([])
148        ovr = config.get('experiment_control', 'overrides')
149        if ovr:
150            for o in ovr.split(","):
151                o = o.strip()
152                if o.startswith('fedid:'): o = o[len('fedid:'):]
153                self.overrides.add(fedid(hexstr=o))
154
155        self.state = { }
156        self.state_lock = Lock()
157        self.tclsh = "/usr/local/bin/otclsh"
158        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
159                config.get("experiment_control", "tcl_splitter",
160                        "/usr/testbed/lib/ns2ir/parse.tcl")
161        mapdb_file = config.get("experiment_control", "mapdb")
162        self.trace_file = sys.stderr
163
164        self.def_expstart = \
165                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
166                "/tmp/federate";
167        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
168                "FEDDIR/hosts";
169        self.def_gwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
171                "/tmp/bridge.log";
172        self.def_mgwstart = \
173                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
174                "/tmp/bridge.log";
175        self.def_gwimage = "FBSD61-TUNNEL2";
176        self.def_gwtype = "pc";
177        self.local_access = { }
178
179        if self.auth_type == 'legacy':
180            if auth:
181                self.auth = auth
182            else:
183                self.log.error( "[access]: No authorizer initialized, " +\
184                        "creating local one.")
185                auth = authorizer()
186            self.get_access = self.legacy_get_access
187        elif self.auth_type == 'abac':
188            self.auth = abac_authorizer(load=self.auth_dir, 
189                    update=os.path.join(self.auth_dir,'update'))
190        else:
191            raise service_error(service_error.internal, 
192                    "Unknown auth_type: %s" % self.auth_type)
193
194        if mapdb_file:
195            self.read_mapdb(mapdb_file)
196        else:
197            self.log.warn("[experiment_control] No testbed map, using defaults")
198            self.tbmap = { 
199                    'deter':'https://users.isi.deterlab.net:23235',
200                    'emulab':'https://users.isi.deterlab.net:23236',
201                    'ucb':'https://users.isi.deterlab.net:23237',
202                    }
203
204        # Grab saved state.  OK to do this w/o locking because it's read only
205        # and only one thread should be in existence that can see self.state at
206        # this point.
207        if self.state_filename:
208            self.read_state()
209
210        if self.store_filename:
211            self.read_store()
212        else:
213            self.log.warning("No saved synch store")
214            self.synch_store = synch_store
215
216        # Dispatch tables
217        self.soap_services = {\
218                'New': soap_handler('New', self.new_experiment),
219                'Create': soap_handler('Create', self.create_experiment),
220                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
221                'Vis': soap_handler('Vis', self.get_vis),
222                'Info': soap_handler('Info', self.get_info),
223                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
224                'Operation': soap_handler('Operation', self.do_operation),
225                'Terminate': soap_handler('Terminate', 
226                    self.terminate_experiment),
227                'GetValue': soap_handler('GetValue', self.GetValue),
228                'SetValue': soap_handler('SetValue', self.SetValue),
229        }
230
231        self.xmlrpc_services = {\
232                'New': xmlrpc_handler('New', self.new_experiment),
233                'Create': xmlrpc_handler('Create', self.create_experiment),
234                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
235                'Vis': xmlrpc_handler('Vis', self.get_vis),
236                'Info': xmlrpc_handler('Info', self.get_info),
237                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
238                'Terminate': xmlrpc_handler('Terminate',
239                    self.terminate_experiment),
240                'Operation': xmlrpc_handler('Operation', self.do_operation),
241                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
242                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
243        }
244
245    # Call while holding self.state_lock
246    def write_state(self):
247        """
248        Write a new copy of experiment state after copying the existing state
249        to a backup.
250
251        State format is a simple pickling of the state dictionary.
252        """
253        if os.access(self.state_filename, os.W_OK):
254            copy_file(self.state_filename, \
255                    "%s.bak" % self.state_filename)
256        try:
257            f = open(self.state_filename, 'w')
258            pickle.dump(self.state, f)
259        except EnvironmentError, e:
260            self.log.error("Can't write file %s: %s" % \
261                    (self.state_filename, e))
262        except pickle.PicklingError, e:
263            self.log.error("Pickling problem: %s" % e)
264        except TypeError, e:
265            self.log.error("Pickling problem (TypeError): %s" % e)
266
267    @staticmethod
268    def get_alloc_ids(exp):
269        """
270        Used by read_store and read state.  This used to be worse.
271        """
272
273        return [ a.allocID for a in exp.get_all_allocations() ]
274
275
276    # Call while holding self.state_lock
277    def read_state(self):
278        """
279        Read a new copy of experiment state.  Old state is overwritten.
280
281        State format is a simple pickling of the state dictionary.
282        """
283       
284        try:
285            f = open(self.state_filename, "r")
286            self.state = pickle.load(f)
287            self.log.debug("[read_state]: Read state from %s" % \
288                    self.state_filename)
289        except EnvironmentError, e:
290            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
291                    % (self.state_filename, e))
292        except pickle.UnpicklingError, e:
293            self.log.warning(("[read_state]: No saved state: " + \
294                    "Unpickling failed: %s") % e)
295       
296        for s in self.state.values():
297            try:
298
299                eid = s.fedid
300                if eid : 
301                    if self.auth_type == 'legacy':
302                        # XXX: legacy
303                        # Give the owner rights to the experiment
304                        #self.auth.set_attribute(s['owner'], eid)
305                        # And holders of the eid as well
306                        self.auth.set_attribute(eid, eid)
307                        # allow overrides to control experiments as well
308                        for o in self.overrides:
309                            self.auth.set_attribute(o, eid)
310                        # Set permissions to allow reading of the software
311                        # repo, if any, as well.
312                        for a in self.get_alloc_ids(s):
313                            self.auth.set_attribute(a, 'repo/%s' % eid)
314                else:
315                    raise KeyError("No experiment id")
316            except KeyError, e:
317                self.log.warning("[read_state]: State ownership or identity " +\
318                        "misformatted in %s: %s" % (self.state_filename, e))
319
320    def read_mapdb(self, file):
321        """
322        Read a simple colon separated list of mappings for the
323        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
324        also adds testbeds to active if they include , active after
325        their name.
326        """
327
328        self.tbmap = { }
329        self.tbactive = set()
330        lineno =0
331        try:
332            f = open(file, "r")
333            for line in f:
334                lineno += 1
335                line = line.strip()
336                if line.startswith('#') or len(line) == 0:
337                    continue
338                try:
339                    label, url = line.split(':', 1)
340                    if ',' in label:
341                        label, act = label.split(',', 1)
342                        active = (act.strip() == 'active')
343                    else:
344                        active = False
345                    self.tbmap[label] = url
346                    if active: self.tbactive.add(label)
347                except ValueError, e:
348                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
349                            "map db: %s %s" % (lineno, line, e))
350        except EnvironmentError, e:
351            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
352                    "open %s: %s" % (file, e))
353        else:
354            f.close()
355
356    def read_store(self):
357        try:
358            self.synch_store = synch_store()
359            self.synch_store.load(self.store_filename)
360            self.log.debug("[read_store]: Read store from %s" % \
361                    self.store_filename)
362        except EnvironmentError, e:
363            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
364                    % (self.state_filename, e))
365            self.synch_store = synch_store()
366
367        # Set the initial permissions on data in the store.  XXX: This ad hoc
368        # authorization attribute initialization is getting out of hand.
369        # XXX: legacy
370        if self.auth_type == 'legacy':
371            for k in self.synch_store.all_keys():
372                try:
373                    if k.startswith('fedid:'):
374                        fid = fedid(hexstr=k[6:46])
375                        if self.state.has_key(fid):
376                            for a in self.get_alloc_ids(self.state[fid]):
377                                self.auth.set_attribute(a, k)
378                except ValueError, e:
379                    self.log.warn("Cannot deduce permissions for %s" % k)
380
381
382    def write_store(self):
383        """
384        Write a new copy of synch_store after writing current state
385        to a backup.  We use the internal synch_store pickle method to avoid
386        incinsistent data.
387
388        State format is a simple pickling of the store.
389        """
390        if os.access(self.store_filename, os.W_OK):
391            copy_file(self.store_filename, \
392                    "%s.bak" % self.store_filename)
393        try:
394            self.synch_store.save(self.store_filename)
395        except EnvironmentError, e:
396            self.log.error("Can't write file %s: %s" % \
397                    (self.store_filename, e))
398        except TypeError, e:
399            self.log.error("Pickling problem (TypeError): %s" % e)
400
401    # XXX this may belong somewhere else
402
403    def get_grouper_updates(self, fid):
404        if self.grouper_url is None: return
405        d = tempfile.mkdtemp()
406        try:
407            fstr = "%s" % fid
408            # XXX locking
409            zipname = os.path.join(d, 'grouper.zip')
410            dest = os.path.join(self.auth_dir, 'update')
411            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
412            f = open(zipname, 'w')
413            f.write(resp.read())
414            f.close()
415            zf = zipfile.ZipFile(zipname, 'r')
416            zf.extractall(dest)
417            zf.close()
418        except URLError, e:
419            self.log.error("Cannot connect to grouper: %s" % e)
420            pass
421        finally:
422            shutil.rmtree(d)
423
424
425    def remove_dirs(self, dir):
426        """
427        Remove the directory tree and all files rooted at dir.  Log any errors,
428        but continue.
429        """
430        self.log.debug("[removedirs]: removing %s" % dir)
431        try:
432            for path, dirs, files in os.walk(dir, topdown=False):
433                for f in files:
434                    os.remove(os.path.join(path, f))
435                for d in dirs:
436                    os.rmdir(os.path.join(path, d))
437            os.rmdir(dir)
438        except EnvironmentError, e:
439            self.log.error("Error deleting directory tree in %s" % e);
440
441    @staticmethod
442    def make_temp_certfile(expcert, tmpdir):
443        """
444        make a protected copy of the access certificate so the experiment
445        controller can act as the experiment principal.  mkstemp is the most
446        secure way to do that. The directory should be created by
447        mkdtemp.  Return the filename.
448        """
449        if expcert and tmpdir:
450            try:
451                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
452                f = os.fdopen(certf, 'w')
453                print >> f, expcert
454                f.close()
455            except EnvironmentError, e:
456                raise service_error(service_error.internal, 
457                        "Cannot create temp cert file?")
458            return certfn
459        else:
460            return None
461
462       
463    def generate_ssh_keys(self, dest, type="rsa" ):
464        """
465        Generate a set of keys for the gateways to use to talk.
466
467        Keys are of type type and are stored in the required dest file.
468        """
469        valid_types = ("rsa", "dsa")
470        t = type.lower();
471        if t not in valid_types: raise ValueError
472        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
473
474        try:
475            trace = open("/dev/null", "w")
476        except EnvironmentError:
477            raise service_error(service_error.internal,
478                    "Cannot open /dev/null??");
479
480        # May raise CalledProcessError
481        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
482        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
483        if rv != 0:
484            raise service_error(service_error.internal, 
485                    "Cannot generate nonce ssh keys.  %s return code %d" \
486                            % (self.ssh_keygen, rv))
487
488    def generate_seer_certs(self, destdir):
489        '''
490        Create a SEER ca cert and a node cert in destdir/ca.pem and
491        destdir/node.pem respectively.  These will be distributed throughout
492        the federated experiment.  This routine reports errors via
493        service_errors.
494        '''
495        openssl = '/usr/bin/openssl'
496        # All the filenames and parameters we need for openssl calls below
497        ca_key =os.path.join(destdir, 'ca.key') 
498        ca_pem = os.path.join(destdir, 'ca.pem')
499        node_key =os.path.join(destdir, 'node.key') 
500        node_pem = os.path.join(destdir, 'node.pem')
501        node_req = os.path.join(destdir, 'node.req')
502        node_signed = os.path.join(destdir, 'node.signed')
503        days = '%s' % (365 * 10)
504        serial = '%s' % random.randint(0, 1<<16)
505
506        try:
507            # Sequence of calls to create a CA key, create a ca cert, create a
508            # node key, node signing request, and finally a signed node
509            # certificate.
510            sequence = (
511                    (openssl, 'genrsa', '-out', ca_key, '1024'),
512                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
513                        ca_pem, '-days', days, '-subj', 
514                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
515                    (openssl, 'genrsa', '-out', node_key, '1024'),
516                    (openssl, 'req', '-new', '-key', node_key, '-out', 
517                        node_req, '-days', days, '-subj', 
518                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
519                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
520                        '-set_serial', serial, '-req', '-in', node_req, 
521                        '-out', node_signed, '-days', days),
522                )
523            # Do all that stuff; bail if there's an error, and push all the
524            # output to dev/null.
525            for cmd in sequence:
526                trace = open("/dev/null", "w")
527                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
528                if rv != 0:
529                    raise service_error(service_error.internal, 
530                            "Cannot generate SEER certs.  %s return code %d" \
531                                    % (' '.join(cmd), rv))
532            # Concatinate the node key and signed certificate into node.pem
533            f = open(node_pem, 'w')
534            for comp in (node_signed, node_key):
535                g = open(comp, 'r')
536                f.write(g.read())
537                g.close()
538            f.close()
539
540            # Throw out intermediaries.
541            for fn in (ca_key, node_key, node_req, node_signed):
542                os.unlink(fn)
543
544        except EnvironmentError, e:
545            # Any difficulties with the file system wind up here
546            raise service_error(service_error.internal,
547                    "File error on  %s while creating SEER certs: %s" % \
548                            (e.filename, e.strerror))
549
550
551
552    def gentopo(self, str):
553        """
554        Generate the topology data structure from the splitter's XML
555        representation of it.
556
557        The topology XML looks like:
558            <experiment>
559                <nodes>
560                    <node><vname></vname><ips>ip1:ip2</ips></node>
561                </nodes>
562                <lans>
563                    <lan>
564                        <vname></vname><vnode></vnode><ip></ip>
565                        <bandwidth></bandwidth><member>node:port</member>
566                    </lan>
567                </lans>
568        """
569        class topo_parse:
570            """
571            Parse the topology XML and create the dats structure.
572            """
573            def __init__(self):
574                # Typing of the subelements for data conversion
575                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
576                self.int_subelements = ( 'bandwidth',)
577                self.float_subelements = ( 'delay',)
578                # The final data structure
579                self.nodes = [ ]
580                self.lans =  [ ]
581                self.topo = { \
582                        'node': self.nodes,\
583                        'lan' : self.lans,\
584                    }
585                self.element = { }  # Current element being created
586                self.chars = ""     # Last text seen
587
588            def end_element(self, name):
589                # After each sub element the contents is added to the current
590                # element or to the appropriate list.
591                if name == 'node':
592                    self.nodes.append(self.element)
593                    self.element = { }
594                elif name == 'lan':
595                    self.lans.append(self.element)
596                    self.element = { }
597                elif name in self.str_subelements:
598                    self.element[name] = self.chars
599                    self.chars = ""
600                elif name in self.int_subelements:
601                    self.element[name] = int(self.chars)
602                    self.chars = ""
603                elif name in self.float_subelements:
604                    self.element[name] = float(self.chars)
605                    self.chars = ""
606
607            def found_chars(self, data):
608                self.chars += data.rstrip()
609
610
611        tp = topo_parse();
612        parser = xml.parsers.expat.ParserCreate()
613        parser.EndElementHandler = tp.end_element
614        parser.CharacterDataHandler = tp.found_chars
615
616        parser.Parse(str)
617
618        return tp.topo
619       
620
621    def genviz(self, topo):
622        """
623        Generate the visualization the virtual topology
624        """
625
626        neato = "/usr/local/bin/neato"
627        # These are used to parse neato output and to create the visualization
628        # file.
629        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
630        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
631                "%s</type></node>"
632
633        try:
634            # Node names
635            nodes = [ n['vname'] for n in topo['node'] ]
636            topo_lans = topo['lan']
637        except KeyError, e:
638            raise service_error(service_error.internal, "Bad topology: %s" %e)
639
640        lans = { }
641        links = { }
642
643        # Walk through the virtual topology, organizing the connections into
644        # 2-node connections (links) and more-than-2-node connections (lans).
645        # When a lan is created, it's added to the list of nodes (there's a
646        # node in the visualization for the lan).
647        for l in topo_lans:
648            if links.has_key(l['vname']):
649                if len(links[l['vname']]) < 2:
650                    links[l['vname']].append(l['vnode'])
651                else:
652                    nodes.append(l['vname'])
653                    lans[l['vname']] = links[l['vname']]
654                    del links[l['vname']]
655                    lans[l['vname']].append(l['vnode'])
656            elif lans.has_key(l['vname']):
657                lans[l['vname']].append(l['vnode'])
658            else:
659                links[l['vname']] = [ l['vnode'] ]
660
661
662        # Open up a temporary file for dot to turn into a visualization
663        try:
664            df, dotname = tempfile.mkstemp()
665            dotfile = os.fdopen(df, 'w')
666        except EnvironmentError:
667            raise service_error(service_error.internal,
668                    "Failed to open file in genviz")
669
670        try:
671            dnull = open('/dev/null', 'w')
672        except EnvironmentError:
673            service_error(service_error.internal,
674                    "Failed to open /dev/null in genviz")
675
676        # Generate a dot/neato input file from the links, nodes and lans
677        try:
678            print >>dotfile, "graph G {"
679            for n in nodes:
680                print >>dotfile, '\t"%s"' % n
681            for l in links.keys():
682                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
683            for l in lans.keys():
684                for n in lans[l]:
685                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
686            print >>dotfile, "}"
687            dotfile.close()
688        except TypeError:
689            raise service_error(service_error.internal,
690                    "Single endpoint link in vtopo")
691        except EnvironmentError:
692            raise service_error(service_error.internal, "Cannot write dot file")
693
694        # Use dot to create a visualization
695        try:
696            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
697                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
698                stderr=dnull, close_fds=True)
699        except EnvironmentError:
700            raise service_error(service_error.internal, 
701                    "Cannot generate visualization: is graphviz available?")
702        dnull.close()
703
704        # Translate dot to vis format
705        vis_nodes = [ ]
706        vis = { 'node': vis_nodes }
707        for line in dot.stdout:
708            m = vis_re.match(line)
709            if m:
710                vn = m.group(1)
711                vis_node = {'name': vn, \
712                        'x': float(m.group(2)),\
713                        'y' : float(m.group(3)),\
714                    }
715                if vn in links.keys() or vn in lans.keys():
716                    vis_node['type'] = 'lan'
717                else:
718                    vis_node['type'] = 'node'
719                vis_nodes.append(vis_node)
720        rv = dot.wait()
721
722        os.remove(dotname)
723        # XXX: graphviz seems to use low return codes for warnings, like
724        # "couldn't find font"
725        if rv < 2 : return vis
726        else: return None
727
728
729    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
730            cert_pwd=None):
731        """
732        Release access to testbed through fedd
733        """
734
735        if not uri and tbmap:
736            uri = tbmap.get(tb, None)
737        if not uri:
738            raise service_error(service_error.server_config, 
739                    "Unknown testbed: %s" % tb)
740
741        if self.local_access.has_key(uri):
742            resp = self.local_access[uri].ReleaseAccess(\
743                    { 'ReleaseAccessRequestBody' : 
744                        {'allocID': {'fedid': aid}},}, 
745                    fedid(file=cert_file))
746            resp = { 'ReleaseAccessResponseBody': resp } 
747        else:
748            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
749                    cert_file, cert_pwd, self.trusted_certs)
750
751        # better error coding
752
753    def remote_ns2topdl(self, uri, desc):
754
755        req = {
756                'description' : { 'ns2description': desc },
757            }
758
759        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
760                self.trusted_certs)
761
762        if r.has_key('Ns2TopdlResponseBody'):
763            r = r['Ns2TopdlResponseBody']
764            ed = r.get('experimentdescription', None)
765            if ed.has_key('topdldescription'):
766                return topdl.Topology(**ed['topdldescription'])
767            else:
768                raise service_error(service_error.protocol, 
769                        "Bad splitter response (no output)")
770        else:
771            raise service_error(service_error.protocol, "Bad splitter response")
772
773    class start_segment:
774        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
775                cert_pwd=None, trusted_certs=None, caller=None,
776                log_collector=None):
777            self.log = log
778            self.debug = debug
779            self.cert_file = cert_file
780            self.cert_pwd = cert_pwd
781            self.trusted_certs = None
782            self.caller = caller
783            self.testbed = testbed
784            self.log_collector = log_collector
785            self.response = None
786            self.node = { }
787            self.subs = { }
788            self.tb = { }
789            self.proof = None
790
791        def make_map(self, resp):
792            if 'segmentdescription' not in resp  or \
793                    'topdldescription' not in resp['segmentdescription']:
794                self.log.warn('No topology returned from startsegment')
795                return 
796
797            top = topdl.Topology(
798                    **resp['segmentdescription']['topdldescription'])
799
800            for e in top.elements:
801                if isinstance(e, topdl.Computer):
802                    self.node[e.name] = e
803                elif isinstance(e, topdl.Testbed):
804                    self.tb[e.uri] = e
805            for s in top.substrates:
806                self.subs[s.name] = s
807
808        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
809            req = {
810                    'allocID': { 'fedid' : aid }, 
811                    'segmentdescription': { 
812                        'topdldescription': topo.to_dict(),
813                    },
814                }
815
816            if connInfo:
817                req['connection'] = connInfo
818
819            import_svcs = [ s for m in masters.values() \
820                    for s in m if self.testbed in s.importers]
821
822            if import_svcs or self.testbed in masters:
823                req['service'] = []
824
825            for s in import_svcs:
826                for r in s.reqs:
827                    sr = copy.deepcopy(r)
828                    sr['visibility'] = 'import';
829                    req['service'].append(sr)
830
831            for s in masters.get(self.testbed, []):
832                for r in s.reqs:
833                    sr = copy.deepcopy(r)
834                    sr['visibility'] = 'export';
835                    req['service'].append(sr)
836
837            if attrs:
838                req['fedAttr'] = attrs
839
840            try:
841                self.log.debug("Calling StartSegment at %s " % uri)
842                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
843                        self.trusted_certs)
844                if r.has_key('StartSegmentResponseBody'):
845                    lval = r['StartSegmentResponseBody'].get('allocationLog',
846                            None)
847                    if lval and self.log_collector:
848                        for line in  lval.splitlines(True):
849                            self.log_collector.write(line)
850                    self.make_map(r['StartSegmentResponseBody'])
851                    if 'proof' in r: self.proof = r['proof']
852                    self.response = r
853                else:
854                    raise service_error(service_error.internal, 
855                            "Bad response!?: %s" %r)
856                return True
857            except service_error, e:
858                self.log.error("Start segment failed on %s: %s" % \
859                        (self.testbed, e))
860                return False
861
862
863
864    class terminate_segment:
865        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
866                cert_pwd=None, trusted_certs=None, caller=None):
867            self.log = log
868            self.debug = debug
869            self.cert_file = cert_file
870            self.cert_pwd = cert_pwd
871            self.trusted_certs = None
872            self.caller = caller
873            self.testbed = testbed
874
875        def __call__(self, uri, aid ):
876            req = {
877                    'allocID': {'fedid': aid }, 
878                }
879            self.log.info("Calling terminate segment")
880            try:
881                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
882                        self.trusted_certs)
883                self.log.info("Terminate segment succeeded")
884                return True
885            except service_error, e:
886                self.log.error("Terminate segment failed on %s: %s" % \
887                        (self.testbed, e))
888                return False
889
890    class info_segment(start_segment):
891        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
892                cert_pwd=None, trusted_certs=None, caller=None,
893                log_collector=None):
894            experiment_control_local.start_segment.__init__(self, debug, 
895                    log, testbed, cert_file, cert_pwd, trusted_certs, 
896                    caller, log_collector)
897
898        def __call__(self, uri, aid):
899            req = { 'allocID': { 'fedid' : aid } }
900
901            try:
902                self.log.debug("Calling InfoSegment at %s " % uri)
903                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
904                        self.trusted_certs)
905                if r.has_key('InfoSegmentResponseBody'):
906                    self.make_map(r['InfoSegmentResponseBody'])
907                    if 'proof' in r: self.proof = r['proof']
908                    self.response = r
909                else:
910                    raise service_error(service_error.internal, 
911                            "Bad response!?: %s" %r)
912                return True
913            except service_error, e:
914                self.log.error("Info segment failed on %s: %s" % \
915                        (self.testbed, e))
916                return False
917
918    class operation_segment:
919        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
920                cert_pwd=None, trusted_certs=None, caller=None,
921                log_collector=None):
922            self.log = log
923            self.debug = debug
924            self.cert_file = cert_file
925            self.cert_pwd = cert_pwd
926            self.trusted_certs = None
927            self.caller = caller
928            self.testbed = testbed
929            self.status = None
930
931        def __call__(self, uri, aid, op, targets, params):
932            req = { 
933                    'allocID': { 'fedid' : aid },
934                    'operation': op,
935                    'target': targets,
936                    }
937            if params: req['parameter'] = params
938
939
940            try:
941                self.log.debug("Calling OperationSegment at %s " % uri)
942                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
943                        self.trusted_certs)
944                if 'OperationSegmentResponseBody' in r:
945                    r = r['OperationSegmentResponseBody']
946                    if 'status' in r:
947                        self.status = r['status']
948                else:
949                    raise service_error(service_error.internal, 
950                            "Bad response!?: %s" %r)
951                return True
952            except service_error, e:
953                self.log.error("Operation segment failed on %s: %s" % \
954                        (self.testbed, e))
955                return False
956
957    def annotate_topology(self, top, data):
958        # These routines do various parts of the annotation
959        def add_new_names(nl, l):
960            """ add any names in nl to the list in l """
961            for n in nl:
962                if n not in l: l.append(n)
963       
964        def merge_services(ne, e):
965            for ns in ne.service:
966                # NB: the else is on the for
967                for s in e.service:
968                    if ns.name == s.name:
969                        s.importer = ns.importer
970                        s.param = ns.param
971                        s.description = ns.description
972                        s.status = ns.status
973                        break
974                else:
975                    e.service.append(ns)
976       
977        def merge_oses(ne, e):
978            """
979            Merge the operating system entries of ne into e
980            """
981            for nos in ne.os:
982                # NB: the else is on the for
983                for os in e.os:
984                    if nos.name == os.name:
985                        os.version = nos.version
986                        os.version = nos.distribution
987                        os.version = nos.distributionversion
988                        for a in nos.attribute:
989                            if os.get_attribute(a.attribute):
990                                os.remove_attribute(a.attribute)
991                            os.set_attribute(a.attribute, a.value)
992                        break
993                else:
994                    # If both nodes have one OS, this is a replacement
995                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
996                    else: e.os.append(nos)
997
998        # Annotate the topology with embedding info
999        for e in top.elements:
1000            if isinstance(e, topdl.Computer):
1001                for s in data:
1002                    ne = s.node.get(e.name, None)
1003                    if ne is not None:
1004                        add_new_names(ne.localname, e.localname)
1005                        e.status = ne.status
1006                        merge_services(ne, e)
1007                        add_new_names(ne.operation, e.operation)
1008                        if ne.os: merge_oses(ne, e)
1009                        break
1010            elif isinstance(e,topdl.Testbed):
1011                for s in data:
1012                    ne = s.tb.get(e.uri, None)
1013                    if ne is not None:
1014                        add_new_names(ne.localname, e.localname)
1015                        add_new_names(ne.operation, e.operation)
1016                        merge_services(ne, e)
1017                        for a in ne.attribute:
1018                            e.set_attribute(a.attribute, a.value)
1019        # Annotate substrates
1020        for s in top.substrates:
1021            for d in data:
1022                ss = d.subs.get(s.name, None)
1023                if ss is not None:
1024                    if ss.capacity is not None:
1025                        s.capacity = ss.capacity
1026                    if s.latency is not None:
1027                        s.latency = ss.latency
1028
1029
1030
1031    def allocate_resources(self, allocated, masters, eid, expid, 
1032            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1033            attrs=None, connInfo={}, tbmap=None, expcert=None):
1034
1035        started = { }           # Testbeds where a sub-experiment started
1036                                # successfully
1037
1038        # XXX
1039        fail_soft = False
1040
1041        if tbmap is None: tbmap = { }
1042
1043        log = alloc_log or self.log
1044
1045        tp = thread_pool(self.nthreads)
1046        threads = [ ]
1047        starters = [ ]
1048
1049        if expcert:
1050            cert = expcert
1051            pw = None
1052        else:
1053            cert = self.cert_file
1054            pw = self.cert_pwd
1055
1056        for tb in allocated.keys():
1057            # Create and start a thread to start the segment, and save it
1058            # to get the return value later
1059            tb_attrs = copy.copy(attrs)
1060            tp.wait_for_slot()
1061            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1062            base, suffix = split_testbed(tb)
1063            if suffix:
1064                tb_attrs.append({'attribute': 'experiment_name', 
1065                    'value': "%s-%s" % (eid, suffix)})
1066            else:
1067                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1068            if not uri:
1069                raise service_error(service_error.internal, 
1070                        "Unknown testbed %s !?" % tb)
1071
1072            aid = tbparams[tb].allocID
1073            if not aid:
1074                raise service_error(service_error.internal, 
1075                        "No alloc id for testbed %s !?" % tb)
1076
1077            s = self.start_segment(log=log, debug=self.debug,
1078                    testbed=tb, cert_file=cert,
1079                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1080                    caller=self.call_StartSegment,
1081                    log_collector=log_collector)
1082            starters.append(s)
1083            t  = pooled_thread(\
1084                    target=s, name=tb,
1085                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1086                    pdata=tp, trace_file=self.trace_file)
1087            threads.append(t)
1088            t.start()
1089
1090        # Wait until all finish (keep pinging the log, though)
1091        mins = 0
1092        revoked = False
1093        while not tp.wait_for_all_done(60.0):
1094            mins += 1
1095            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1096                    % mins)
1097            if not revoked and \
1098                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1099                # a testbed has failed.  Revoke this experiment's
1100                # synchronizarion values so that sub experiments will not
1101                # deadlock waiting for synchronization that will never happen
1102                self.log.info("A subexperiment has failed to swap in, " + \
1103                        "revoking synch keys")
1104                var_key = "fedid:%s" % expid
1105                for k in self.synch_store.all_keys():
1106                    if len(k) > 45 and k[0:46] == var_key:
1107                        self.synch_store.revoke_key(k)
1108                revoked = True
1109
1110        failed = [ t.getName() for t in threads if not t.rv ]
1111        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1112
1113        # If one failed clean up, unless fail_soft is set
1114        if failed:
1115            if not fail_soft:
1116                tp.clear()
1117                for tb in succeeded:
1118                    # Create and start a thread to stop the segment
1119                    tp.wait_for_slot()
1120                    uri = tbparams[tb].uri
1121                    t  = pooled_thread(\
1122                            target=self.terminate_segment(log=log,
1123                                testbed=tb,
1124                                cert_file=cert, 
1125                                cert_pwd=pw,
1126                                trusted_certs=self.trusted_certs,
1127                                caller=self.call_TerminateSegment),
1128                            args=(uri, tbparams[tb].allocID),
1129                            name=tb,
1130                            pdata=tp, trace_file=self.trace_file)
1131                    t.start()
1132                # Wait until all finish (if any are being stopped)
1133                if succeeded:
1134                    tp.wait_for_all_done()
1135
1136                # release the allocations
1137                for tb in tbparams.keys():
1138                    try:
1139                        self.release_access(tb, tbparams[tb].allocID, 
1140                                tbmap=tbmap, uri=tbparams[tb].uri,
1141                                cert_file=cert, cert_pwd=pw)
1142                    except service_error, e:
1143                        self.log.warn("Error releasing access: %s" % e.desc)
1144                # Remove the placeholder
1145                self.state_lock.acquire()
1146                self.state[eid].status = 'failed'
1147                self.state[eid].updated()
1148                if self.state_filename: self.write_state()
1149                self.state_lock.release()
1150                # Remove the repo dir
1151                self.remove_dirs("%s/%s" %(self.repodir, expid))
1152                # Walk up tmpdir, deleting as we go
1153                if self.cleanup:
1154                    self.remove_dirs(tmpdir)
1155                else:
1156                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1157
1158
1159                log.error("Swap in failed on %s" % ",".join(failed))
1160                return
1161        else:
1162            # Walk through the successes and gather the proofs
1163            proofs = { }
1164            for s in starters:
1165                if s.proof:
1166                    proofs[s.testbed] = s.proof
1167            self.annotate_topology(top, starters)
1168            log.info("[start_segment]: Experiment %s active" % eid)
1169
1170
1171        # Walk up tmpdir, deleting as we go
1172        if self.cleanup:
1173            self.remove_dirs(tmpdir)
1174        else:
1175            log.debug("[start_experiment]: not removing %s" % tmpdir)
1176
1177        # Insert the experiment into our state and update the disk copy.
1178        self.state_lock.acquire()
1179        self.state[expid].status = 'active'
1180        self.state[eid] = self.state[expid]
1181        self.state[eid].top = top
1182        self.state[eid].updated()
1183        # Append startup proofs
1184        for f in self.state[eid].get_all_allocations():
1185            if f.tb in proofs:
1186                f.proof.append(proofs[f.tb])
1187
1188        if self.state_filename: self.write_state()
1189        self.state_lock.release()
1190        return
1191
1192
1193    def add_kit(self, e, kit):
1194        """
1195        Add a Software object created from the list of (install, location)
1196        tuples passed as kit  to the software attribute of an object e.  We
1197        do this enough to break out the code, but it's kind of a hack to
1198        avoid changing the old tuple rep.
1199        """
1200
1201        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1202
1203        if isinstance(e.software, list): e.software.extend(s)
1204        else: e.software = s
1205
1206    def append_experiment_authorization(self, expid, attrs, 
1207            need_state_lock=True):
1208        """
1209        Append the authorization information to system state
1210        """
1211
1212        for p, a in attrs:
1213            self.auth.set_attribute(p, a)
1214        self.auth.save()
1215
1216        if need_state_lock: self.state_lock.acquire()
1217        # XXX: really a no op?
1218        #self.state[expid]['auth'].update(attrs)
1219        if self.state_filename: self.write_state()
1220        if need_state_lock: self.state_lock.release()
1221
1222    def clear_experiment_authorization(self, expid, need_state_lock=True):
1223        """
1224        Attrs is a set of attribute principal pairs that need to be removed
1225        from the authenticator.  Remove them and save the authenticator.
1226        """
1227
1228        if need_state_lock: self.state_lock.acquire()
1229        # XXX: should be a no-op
1230        #if expid in self.state and 'auth' in self.state[expid]:
1231            #for p, a in self.state[expid]['auth']:
1232                #self.auth.unset_attribute(p, a)
1233            #self.state[expid]['auth'] = set()
1234        if self.state_filename: self.write_state()
1235        if need_state_lock: self.state_lock.release()
1236        self.auth.save()
1237
1238
1239    def create_experiment_state(self, fid, req, expid, expcert,
1240            state='starting'):
1241        """
1242        Create the initial entry in the experiment's state.  The expid and
1243        expcert are the experiment's fedid and certifacte that represents that
1244        ID, which are installed in the experiment state.  If the request
1245        includes a suggested local name that is used if possible.  If the local
1246        name is already taken by an experiment owned by this user that has
1247        failed, it is overwritten.  Otherwise new letters are added until a
1248        valid localname is found.  The generated local name is returned.
1249        """
1250
1251        if req.has_key('experimentID') and \
1252                req['experimentID'].has_key('localname'):
1253            overwrite = False
1254            eid = req['experimentID']['localname']
1255            # If there's an old failed experiment here with the same local name
1256            # and accessible by this user, we'll overwrite it, otherwise we'll
1257            # fall through and do the collision avoidance.
1258            old_expid = self.get_experiment_fedid(eid)
1259            if old_expid:
1260                users_experiment = True
1261                try:
1262                    self.check_experiment_access(fid, old_expid)
1263                except service_error, e:
1264                    if e.code == service_error.access: users_experiment = False
1265                    else: raise e
1266                if users_experiment:
1267                    self.state_lock.acquire()
1268                    status = self.state[eid].status
1269                    if status and status == 'failed':
1270                        # remove the old access attributes
1271                        self.clear_experiment_authorization(eid,
1272                                need_state_lock=False)
1273                        overwrite = True
1274                        del self.state[eid]
1275                        del self.state[old_expid]
1276                    self.state_lock.release()
1277                else:
1278                    self.log.info('Experiment %s exists, ' % eid + \
1279                            'but this user cannot access it')
1280            self.state_lock.acquire()
1281            while (self.state.has_key(eid) and not overwrite):
1282                eid += random.choice(string.ascii_letters)
1283            # Initial state
1284            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1285                    identity=expcert)
1286            self.state[expid] = self.state[eid]
1287            if self.state_filename: self.write_state()
1288            self.state_lock.release()
1289        else:
1290            eid = self.exp_stem
1291            for i in range(0,5):
1292                eid += random.choice(string.ascii_letters)
1293            self.state_lock.acquire()
1294            while (self.state.has_key(eid)):
1295                eid = self.exp_stem
1296                for i in range(0,5):
1297                    eid += random.choice(string.ascii_letters)
1298            # Initial state
1299            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1300                    identity=expcert)
1301            self.state[expid] = self.state[eid]
1302            if self.state_filename: self.write_state()
1303            self.state_lock.release()
1304
1305        # Let users touch the state.  Authorize this fid and the expid itself
1306        # to touch the experiment, as well as allowing th eoverrides.
1307        self.append_experiment_authorization(eid, 
1308                set([(fid, expid), (expid,expid)] + \
1309                        [ (o, expid) for o in self.overrides]))
1310
1311        return eid
1312
1313
1314    def allocate_ips_to_topo(self, top):
1315        """
1316        Add an ip4_address attribute to all the hosts in the topology, based on
1317        the shared substrates on which they sit.  An /etc/hosts file is also
1318        created and returned as a list of hostfiles entries.  We also return
1319        the allocator, because we may need to allocate IPs to portals
1320        (specifically DRAGON portals).
1321        """
1322        subs = sorted(top.substrates, 
1323                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1324                reverse=True)
1325        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1326        ifs = { }
1327        hosts = [ ]
1328
1329        for idx, s in enumerate(subs):
1330            net_size = len(s.interfaces)+2
1331
1332            a = ips.allocate(net_size)
1333            if a :
1334                base, num = a
1335                if num < net_size: 
1336                    raise service_error(service_error.internal,
1337                            "Allocator returned wrong number of IPs??")
1338            else:
1339                raise service_error(service_error.req, 
1340                        "Cannot allocate IP addresses")
1341            mask = ips.min_alloc
1342            while mask < net_size:
1343                mask *= 2
1344
1345            netmask = ((2**32-1) ^ (mask-1))
1346
1347            base += 1
1348            for i in s.interfaces:
1349                i.attribute.append(
1350                        topdl.Attribute('ip4_address', 
1351                            "%s" % ip_addr(base)))
1352                i.attribute.append(
1353                        topdl.Attribute('ip4_netmask', 
1354                            "%s" % ip_addr(int(netmask))))
1355
1356                hname = i.element.name
1357                if ifs.has_key(hname):
1358                    hosts.append("%s\t%s-%s %s-%d" % \
1359                            (ip_addr(base), hname, s.name, hname,
1360                                ifs[hname]))
1361                else:
1362                    ifs[hname] = 0
1363                    hosts.append("%s\t%s-%s %s-%d %s" % \
1364                            (ip_addr(base), hname, s.name, hname,
1365                                ifs[hname], hname))
1366
1367                ifs[hname] += 1
1368                base += 1
1369        return hosts, ips
1370
1371    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1372            tbparam, masters, tbmap, expid=None, expcert=None):
1373        for tb in testbeds:
1374            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1375                    expcert)
1376            allocated[tb] = 1
1377
1378    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1379            expcert=None):
1380        """
1381        Get access to testbed through fedd and set the parameters for that tb
1382        """
1383        def get_export_project(svcs):
1384            """
1385            Look through for the list of federated_service for this testbed
1386            objects for a project_export service, and extract the project
1387            parameter.
1388            """
1389
1390            pe = [s for s in svcs if s.name=='project_export']
1391            if len(pe) == 1:
1392                return pe[0].params.get('project', None)
1393            elif len(pe) == 0:
1394                return None
1395            else:
1396                raise service_error(service_error.req,
1397                        "More than one project export is not supported")
1398
1399        def add_services(svcs, type, slist, keys):
1400            """
1401            Add the given services to slist.  type is import or export.  Also
1402            add a mapping entry from the assigned id to the original service
1403            record.
1404            """
1405            for i, s in enumerate(svcs):
1406                idx = '%s%d' % (type, i)
1407                keys[idx] = s
1408                sr = {'id': idx, 'name': s.name, 'visibility': type }
1409                if s.params:
1410                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1411                            for k, v in s.params.items()]
1412                slist.append(sr)
1413
1414        uri = tbmap.get(testbed_base(tb), None)
1415        if not uri:
1416            raise service_error(service_error.server_config, 
1417                    "Unknown testbed: %s" % tb)
1418
1419        export_svcs = masters.get(tb,[])
1420        import_svcs = [ s for m in masters.values() \
1421                for s in m \
1422                    if tb in s.importers ]
1423
1424        export_project = get_export_project(export_svcs)
1425        # Compose the credential list so that IDs come before attributes
1426        creds = set()
1427        keys = set()
1428        certs = self.auth.get_creds_for_principal(fid)
1429        # Append credenials about this experiment controller - e.g. that it is
1430        # trusted.
1431        certs.update(self.auth.get_creds_for_principal(
1432            fedid(file=self.cert_file)))
1433        if expid:
1434            certs.update(self.auth.get_creds_for_principal(expid))
1435        for c in certs:
1436            keys.add(c.issuer_cert())
1437            creds.add(c.attribute_cert())
1438        creds = list(keys) + list(creds)
1439
1440        if expcert: cert, pw = expcert, None
1441        else: cert, pw = self.cert_file, self.cert_pw
1442
1443        # Request credentials
1444        req = {
1445                'abac_credential': creds,
1446            }
1447        # Make the service request from the services we're importing and
1448        # exporting.  Keep track of the export request ids so we can
1449        # collect the resulting info from the access response.
1450        e_keys = { }
1451        if import_svcs or export_svcs:
1452            slist = []
1453            add_services(import_svcs, 'import', slist, e_keys)
1454            add_services(export_svcs, 'export', slist, e_keys)
1455            req['service'] = slist
1456
1457        if self.local_access.has_key(uri):
1458            # Local access call
1459            req = { 'RequestAccessRequestBody' : req }
1460            r = self.local_access[uri].RequestAccess(req, 
1461                    fedid(file=self.cert_file))
1462            r = { 'RequestAccessResponseBody' : r }
1463        else:
1464            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1465
1466        if r.has_key('RequestAccessResponseBody'):
1467            # Through to here we have a valid response, not a fault.
1468            # Access denied is a fault, so something better or worse than
1469            # access denied has happened.
1470            r = r['RequestAccessResponseBody']
1471            self.log.debug("[get_access] Access granted")
1472        else:
1473            raise service_error(service_error.protocol,
1474                        "Bad proxy response")
1475        if 'proof' not in r:
1476            raise service_error(service_error.protocol,
1477                        "Bad access response (no access proof)")
1478
1479        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1480                tb=tb, uri=uri, proof=[r['proof']], 
1481                services=masters.get(tb, None))
1482
1483        # Collect the responses corresponding to the services this testbed
1484        # exports.  These will be the service requests that we will include in
1485        # the start segment requests (with appropriate visibility values) to
1486        # import and export the segments.
1487        for s in r.get('service', []):
1488            id = s.get('id', None)
1489            # Note that this attaches the response to the object in the masters
1490            # data structure.  (The e_keys index disappears when this fcn
1491            # returns)
1492            if id and id in e_keys:
1493                e_keys[id].reqs.append(s)
1494
1495        # Add attributes to parameter space.  We don't allow attributes to
1496        # overlay any parameters already installed.
1497        for a in r.get('fedAttr', []):
1498            try:
1499                if a['attribute']:
1500                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1501            except KeyError:
1502                self.log.error("Bad attribute in response: %s" % a)
1503
1504
1505    def split_topology(self, top, topo, testbeds):
1506        """
1507        Create the sub-topologies that are needed for experiment instantiation.
1508        """
1509        for tb in testbeds:
1510            topo[tb] = top.clone()
1511            # copy in for loop allows deletions from the original
1512            for e in [ e for e in topo[tb].elements]:
1513                etb = e.get_attribute('testbed')
1514                # NB: elements without a testbed attribute won't appear in any
1515                # sub topologies. 
1516                if not etb or etb != tb:
1517                    for i in e.interface:
1518                        for s in i.subs:
1519                            try:
1520                                s.interfaces.remove(i)
1521                            except ValueError:
1522                                raise service_error(service_error.internal,
1523                                        "Can't remove interface??")
1524                    topo[tb].elements.remove(e)
1525            topo[tb].make_indices()
1526
1527    def confirm_software(self, top):
1528        """
1529        Make sure that the software to be loaded in the topo is all available
1530        before we begin making access requests, etc.  This is a subset of
1531        wrangle_software.
1532        """
1533        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1534        pkgs.update([x.location for e in top.elements for x in e.software])
1535
1536        for pkg in pkgs:
1537            loc = pkg
1538
1539            scheme, host, path = urlparse(loc)[0:3]
1540            dest = os.path.basename(path)
1541            if not scheme:
1542                if not loc.startswith('/'):
1543                    loc = "/%s" % loc
1544                loc = "file://%s" %loc
1545            # NB: if scheme was found, loc == pkg
1546            try:
1547                u = urlopen(loc)
1548                u.close()
1549            except Exception, e:
1550                raise service_error(service_error.req, 
1551                        "Cannot open %s: %s" % (loc, e))
1552        return True
1553
1554    def wrangle_software(self, expid, top, topo, tbparams):
1555        """
1556        Copy software out to the repository directory, allocate permissions and
1557        rewrite the segment topologies to look for the software in local
1558        places.
1559        """
1560
1561        # Copy the rpms and tarfiles to a distribution directory from
1562        # which the federants can retrieve them
1563        linkpath = "%s/software" %  expid
1564        softdir ="%s/%s" % ( self.repodir, linkpath)
1565        softmap = { }
1566
1567        # self.fedkit and self.gateway kit are lists of tuples of
1568        # (install_location, download_location) this extracts the download
1569        # locations.
1570        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1571        pkgs.update([x.location for e in top.elements for x in e.software])
1572        try:
1573            os.makedirs(softdir)
1574        except EnvironmentError, e:
1575            raise service_error(
1576                    "Cannot create software directory: %s" % e)
1577        # The actual copying.  Everything's converted into a url for copying.
1578        auth_attrs = set()
1579        for pkg in pkgs:
1580            loc = pkg
1581
1582            scheme, host, path = urlparse(loc)[0:3]
1583            dest = os.path.basename(path)
1584            if not scheme:
1585                if not loc.startswith('/'):
1586                    loc = "/%s" % loc
1587                loc = "file://%s" %loc
1588            # NB: if scheme was found, loc == pkg
1589            try:
1590                u = urlopen(loc)
1591            except Exception, e:
1592                raise service_error(service_error.req, 
1593                        "Cannot open %s: %s" % (loc, e))
1594            try:
1595                f = open("%s/%s" % (softdir, dest) , "w")
1596                self.log.debug("Writing %s/%s" % (softdir,dest) )
1597                data = u.read(4096)
1598                while data:
1599                    f.write(data)
1600                    data = u.read(4096)
1601                f.close()
1602                u.close()
1603            except Exception, e:
1604                raise service_error(service_error.internal,
1605                        "Could not copy %s: %s" % (loc, e))
1606            path = re.sub("/tmp", "", linkpath)
1607            # XXX
1608            softmap[pkg] = \
1609                    "%s/%s/%s" %\
1610                    ( self.repo_url, path, dest)
1611
1612            # Allow the individual segments to access the software by assigning
1613            # an attribute to each testbed allocation that encodes the data to
1614            # be released.  This expression collects the data for each run of
1615            # the loop.
1616            auth_attrs.update([
1617                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1618                        for tb in tbparams.keys()])
1619
1620        self.append_experiment_authorization(expid, auth_attrs)
1621
1622        # Convert the software locations in the segments into the local
1623        # copies on this host
1624        for soft in [ s for tb in topo.values() \
1625                for e in tb.elements \
1626                    if getattr(e, 'software', False) \
1627                        for s in e.software ]:
1628            if softmap.has_key(soft.location):
1629                soft.location = softmap[soft.location]
1630
1631
1632    def new_experiment(self, req, fid):
1633        """
1634        The external interface to empty initial experiment creation called from
1635        the dispatcher.
1636
1637        Creates a working directory, splits the incoming description using the
1638        splitter script and parses out the avrious subsections using the
1639        lcasses above.  Once each sub-experiment is created, use pooled threads
1640        to instantiate them and start it all up.
1641        """
1642        self.log.info("New experiment call started for %s" % fid)
1643        req = req.get('NewRequestBody', None)
1644        if not req:
1645            raise service_error(service_error.req,
1646                    "Bad request format (no NewRequestBody)")
1647
1648        # import may partially succeed so always save credentials and warn
1649        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1650            self.log.debug("Failed to import delegation credentials(!)")
1651        self.get_grouper_updates(fid)
1652        self.auth.update()
1653        self.auth.save()
1654
1655        try:
1656            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1657                    with_proof=True)
1658        except service_error, e:
1659            self.log.info("New experiment call for %s: access denied" % fid)
1660            raise e
1661
1662
1663        if not access_ok:
1664            self.log.info("New experiment call for %s: Access denied" % fid)
1665            raise service_error(service_error.access, "New access denied",
1666                    proof=[proof])
1667
1668        try:
1669            tmpdir = tempfile.mkdtemp(prefix="split-")
1670        except EnvironmentError:
1671            raise service_error(service_error.internal, "Cannot create tmp dir")
1672
1673        # Generate an ID for the experiment (slice) and a certificate that the
1674        # allocator can use to prove they own it.  We'll ship it back through
1675        # the encrypted connection.  If the requester supplied one, use it.
1676        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1677            expcert = req['experimentAccess']['X509']
1678            expid = fedid(certstr=expcert)
1679            self.state_lock.acquire()
1680            if expid in self.state:
1681                self.state_lock.release()
1682                raise service_error(service_error.req, 
1683                        'fedid %s identifies an existing experiment' % expid)
1684            self.state_lock.release()
1685        else:
1686            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1687
1688        #now we're done with the tmpdir, and it should be empty
1689        if self.cleanup:
1690            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1691            os.rmdir(tmpdir)
1692        else:
1693            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1694
1695        eid = self.create_experiment_state(fid, req, expid, expcert, 
1696                state='empty')
1697
1698        rv = {
1699                'experimentID': [
1700                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1701                ],
1702                'experimentStatus': 'empty',
1703                'experimentAccess': { 'X509' : expcert },
1704                'proof': proof.to_dict(),
1705            }
1706
1707        self.log.info("New experiment call succeeded for %s" % fid)
1708        return rv
1709
1710    # create_experiment sub-functions
1711
1712    @staticmethod
1713    def get_experiment_key(req, field='experimentID'):
1714        """
1715        Parse the experiment identifiers out of the request (the request body
1716        tag has been removed).  Specifically this pulls either the fedid or the
1717        localname out of the experimentID field.  A fedid is preferred.  If
1718        neither is present or the request does not contain the fields,
1719        service_errors are raised.
1720        """
1721        # Get the experiment access
1722        exp = req.get(field, None)
1723        if exp:
1724            if exp.has_key('fedid'):
1725                key = exp['fedid']
1726            elif exp.has_key('localname'):
1727                key = exp['localname']
1728            else:
1729                raise service_error(service_error.req, "Unknown lookup type")
1730        else:
1731            raise service_error(service_error.req, "No request?")
1732
1733        return key
1734
1735    def get_experiment_ids_and_start(self, key, tmpdir):
1736        """
1737        Get the experiment name, id and access certificate from the state, and
1738        set the experiment state to 'starting'.  returns a triple (fedid,
1739        localname, access_cert_file). The access_cert_file is a copy of the
1740        contents of the access certificate, created in the tempdir with
1741        restricted permissions.  If things are confused, raise an exception.
1742        """
1743
1744        expid = eid = None
1745        self.state_lock.acquire()
1746        if key in self.state:
1747            exp = self.state[key]
1748            exp.status = "starting"
1749            exp.updated()
1750            expid = exp.fedid
1751            eid = exp.localname
1752            expcert = exp.identity
1753        self.state_lock.release()
1754
1755        # make a protected copy of the access certificate so the experiment
1756        # controller can act as the experiment principal.
1757        if expcert:
1758            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1759            if not expcert_file:
1760                raise service_error(service_error.internal, 
1761                        "Cannot create temp cert file?")
1762        else:
1763            expcert_file = None
1764
1765        return (eid, expid, expcert_file)
1766
1767    def get_topology(self, req, tmpdir):
1768        """
1769        Get the ns2 content and put it into a file for parsing.  Call the local
1770        or remote parser and return the topdl.Topology.  Errors result in
1771        exceptions.  req is the request and tmpdir is a work directory.
1772        """
1773
1774        # The tcl parser needs to read a file so put the content into that file
1775        descr=req.get('experimentdescription', None)
1776        if descr:
1777            if 'ns2description' in descr:
1778                file_content=descr['ns2description']
1779            elif 'topdldescription' in descr:
1780                return topdl.Topology(**descr['topdldescription'])
1781            else:
1782                raise service_error(service_error.req, 
1783                        'Unknown experiment description type')
1784        else:
1785            raise service_error(service_error.req, "No experiment description")
1786
1787
1788        if self.splitter_url:
1789            self.log.debug("Calling remote topdl translator at %s" % \
1790                    self.splitter_url)
1791            top = self.remote_ns2topdl(self.splitter_url, file_content)
1792        else:
1793            tclfile = os.path.join(tmpdir, "experiment.tcl")
1794            if file_content:
1795                try:
1796                    f = open(tclfile, 'w')
1797                    f.write(file_content)
1798                    f.close()
1799                except EnvironmentError:
1800                    raise service_error(service_error.internal,
1801                            "Cannot write temp experiment description")
1802            else:
1803                raise service_error(service_error.req, 
1804                        "Only ns2descriptions supported")
1805            pid = "dummy"
1806            gid = "dummy"
1807            eid = "dummy"
1808
1809            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1810                str(self.muxmax), '-m', 'dummy']
1811
1812            tclcmd.extend([pid, gid, eid, tclfile])
1813
1814            self.log.debug("running local splitter %s", " ".join(tclcmd))
1815            # This is just fantastic.  As a side effect the parser copies
1816            # tb_compat.tcl into the current directory, so that directory
1817            # must be writable by the fedd user.  Doing this in the
1818            # temporary subdir ensures this is the case.
1819            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1820                    cwd=tmpdir)
1821            split_data = tclparser.stdout
1822
1823            top = topdl.topology_from_xml(file=split_data, top="experiment")
1824            os.remove(tclfile)
1825
1826        return top
1827
1828    def get_testbed_services(self, req, testbeds):
1829        """
1830        Parse the services section of the request into two dicts mapping
1831        testbed to lists of federated_service objects.  The first dict maps all
1832        exporters of services to those service objects, the second maps
1833        testbeds to service objects only for services requiring portals.
1834        """
1835        # We construct both dicts here because deriving the second is more
1836        # comples than it looks - both the keys and lists can differ, and it's
1837        # much easier to generate both in one pass.
1838        masters = { }
1839        pmasters = { }
1840        for s in req.get('service', []):
1841            # If this is a service request with the importall field
1842            # set, fill it out.
1843
1844            if s.get('importall', False):
1845                s['import'] = [ tb for tb in testbeds \
1846                        if tb not in s.get('export',[])]
1847                del s['importall']
1848
1849            # Add the service to masters
1850            for tb in s.get('export', []):
1851                if s.get('name', None):
1852
1853                    params = { }
1854                    for a in s.get('fedAttr', []):
1855                        params[a.get('attribute', '')] = a.get('value','')
1856
1857                    fser = federated_service(name=s['name'],
1858                            exporter=tb, importers=s.get('import',[]),
1859                            params=params)
1860                    if fser.name == 'hide_hosts' \
1861                            and 'hosts' not in fser.params:
1862                        fser.params['hosts'] = \
1863                                ",".join(tb_hosts.get(fser.exporter, []))
1864                    if tb in masters: masters[tb].append(fser)
1865                    else: masters[tb] = [fser]
1866
1867                    if fser.portal:
1868                        if tb in pmasters: pmasters[tb].append(fser)
1869                        else: pmasters[tb] = [fser]
1870                else:
1871                    self.log.error('Testbed service does not have name " + \
1872                            "and importers')
1873        return masters, pmasters
1874
1875    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1876        """
1877        Create the ssh keys necessary for interconnecting the portal nodes and
1878        the global hosts file for letting each segment know about the IP
1879        addresses in play.  Save these into the repo.  Add attributes to the
1880        autorizer allowing access controllers to download them and return a set
1881        of attributes that inform the segments where to find this stuff.  May
1882        raise service_errors in if there are problems.
1883        """
1884        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1885        gw_secretkey_base = "fed.%s" % self.ssh_type
1886        keydir = os.path.join(tmpdir, 'keys')
1887        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1888        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1889
1890        try:
1891            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1892        except ValueError:
1893            raise service_error(service_error.server_config, 
1894                    "Bad key type (%s)" % self.ssh_type)
1895
1896        self.generate_seer_certs(keydir)
1897
1898        # Copy configuration files into the remote file store
1899        # The config urlpath
1900        configpath = "/%s/config" % expid
1901        # The config file system location
1902        configdir ="%s%s" % ( self.repodir, configpath)
1903        try:
1904            os.makedirs(configdir)
1905        except EnvironmentError, e:
1906            raise service_error(service_error.internal,
1907                    "Cannot create config directory: %s" % e)
1908        try:
1909            f = open("%s/hosts" % configdir, "w")
1910            print >> f, string.join(hosts, '\n')
1911            f.close()
1912        except EnvironmentError, e:
1913            raise service_error(service_error.internal, 
1914                    "Cannot write hosts file: %s" % e)
1915        try:
1916            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1917            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1918            copy_file(os.path.join(keydir, 'ca.pem'), 
1919                    os.path.join(configdir, 'ca.pem'))
1920            copy_file(os.path.join(keydir, 'node.pem'), 
1921                    os.path.join(configdir, 'node.pem'))
1922        except EnvironmentError, e:
1923            raise service_error(service_error.internal, 
1924                    "Cannot copy keyfiles: %s" % e)
1925
1926        # Allow the individual testbeds to access the configuration files,
1927        # again by setting an attribute for the relevant pathnames on each
1928        # allocation principal.  Yeah, that's a long list comprehension.
1929        self.append_experiment_authorization(expid, set([
1930            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1931                    for tb in tbparams.keys() \
1932                        for f in ("hosts", 'ca.pem', 'node.pem', 
1933                            gw_secretkey_base, gw_pubkey_base)]))
1934
1935        attrs = [ 
1936                {
1937                    'attribute': 'ssh_pubkey', 
1938                    'value': '%s/%s/config/%s' % \
1939                            (self.repo_url, expid, gw_pubkey_base)
1940                },
1941                {
1942                    'attribute': 'ssh_secretkey', 
1943                    'value': '%s/%s/config/%s' % \
1944                            (self.repo_url, expid, gw_secretkey_base)
1945                },
1946                {
1947                    'attribute': 'hosts', 
1948                    'value': '%s/%s/config/hosts' % \
1949                            (self.repo_url, expid)
1950                },
1951                {
1952                    'attribute': 'seer_ca_pem', 
1953                    'value': '%s/%s/config/%s' % \
1954                            (self.repo_url, expid, 'ca.pem')
1955                },
1956                {
1957                    'attribute': 'seer_node_pem', 
1958                    'value': '%s/%s/config/%s' % \
1959                            (self.repo_url, expid, 'node.pem')
1960                },
1961            ]
1962        return attrs
1963
1964
1965    def get_vtopo(self, req, fid):
1966        """
1967        Return the stored virtual topology for this experiment
1968        """
1969        rv = None
1970        state = None
1971        self.log.info("vtopo call started for %s" %  fid)
1972
1973        req = req.get('VtopoRequestBody', None)
1974        if not req:
1975            raise service_error(service_error.req,
1976                    "Bad request format (no VtopoRequestBody)")
1977        exp = req.get('experiment', None)
1978        if exp:
1979            if exp.has_key('fedid'):
1980                key = exp['fedid']
1981                keytype = "fedid"
1982            elif exp.has_key('localname'):
1983                key = exp['localname']
1984                keytype = "localname"
1985            else:
1986                raise service_error(service_error.req, "Unknown lookup type")
1987        else:
1988            raise service_error(service_error.req, "No request?")
1989
1990        try:
1991            proof = self.check_experiment_access(fid, key)
1992        except service_error, e:
1993            self.log.info("vtopo call failed for %s: access denied" %  fid)
1994            raise e
1995
1996        self.state_lock.acquire()
1997        # XXX: this needs to be recalculated
1998        if key in self.state:
1999            if self.state[key].top is not None:
2000                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2001                rv = { 'experiment' : {keytype: key },
2002                        'vtopo': vtopo,
2003                        'proof': proof.to_dict(), 
2004                    }
2005            else:
2006                state = self.state[key].status
2007        self.state_lock.release()
2008
2009        if rv: 
2010            self.log.info("vtopo call completed for %s %s " % \
2011                (key, fid))
2012            return rv
2013        else: 
2014            if state:
2015                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2016                    (key, fid))
2017                raise service_error(service_error.partial, 
2018                        "Not ready: %s" % state)
2019            else:
2020                self.log.info("vtopo call completed for %s %s (No experiment)"\
2021                        % (key, fid))
2022                raise service_error(service_error.req, "No such experiment")
2023
2024    def get_vis(self, req, fid):
2025        """
2026        Return the stored visualization for this experiment
2027        """
2028        rv = None
2029        state = None
2030
2031        self.log.info("vis call started for %s" %  fid)
2032        req = req.get('VisRequestBody', None)
2033        if not req:
2034            raise service_error(service_error.req,
2035                    "Bad request format (no VisRequestBody)")
2036        exp = req.get('experiment', None)
2037        if exp:
2038            if exp.has_key('fedid'):
2039                key = exp['fedid']
2040                keytype = "fedid"
2041            elif exp.has_key('localname'):
2042                key = exp['localname']
2043                keytype = "localname"
2044            else:
2045                raise service_error(service_error.req, "Unknown lookup type")
2046        else:
2047            raise service_error(service_error.req, "No request?")
2048
2049        try:
2050            proof = self.check_experiment_access(fid, key)
2051        except service_error, e:
2052            self.log.info("vis call failed for %s: access denied" %  fid)
2053            raise e
2054
2055        self.state_lock.acquire()
2056        # Generate the visualization
2057        if key in self.state:
2058            if self.state[key].top is not None:
2059                try:
2060                    vis = self.genviz(
2061                            topdl.topology_to_vtopo(self.state[key].top))
2062                except service_error, e:
2063                    self.state_lock.release()
2064                    raise e
2065                rv =  { 'experiment' : {keytype: key },
2066                        'vis': vis,
2067                        'proof': proof.to_dict(), 
2068                        }
2069            else:
2070                state = self.state[key].status
2071        self.state_lock.release()
2072
2073        if rv: 
2074            self.log.info("vis call completed for %s %s " % \
2075                (key, fid))
2076            return rv
2077        else:
2078            if state:
2079                self.log.info("vis call completed for %s %s (not ready)" % \
2080                    (key, fid))
2081                raise service_error(service_error.partial, 
2082                        "Not ready: %s" % state)
2083            else:
2084                self.log.info("vis call completed for %s %s (no experiment)" % \
2085                    (key, fid))
2086                raise service_error(service_error.req, "No such experiment")
2087
2088   
2089    def save_federant_information(self, allocated, tbparams, eid, top):
2090        """
2091        Store the various data that have changed in the experiment state
2092        between when it was started and the beginning of resource allocation.
2093        This is basically the information about each local allocation.  This
2094        fills in the values of the placeholder allocation in the state.  It
2095        also collects the access proofs and returns them as dicts for a
2096        response message.
2097        """
2098        self.state_lock.acquire()
2099        exp = self.state[eid]
2100        exp.top = top.clone()
2101        # save federant information
2102        for k in allocated.keys():
2103            exp.add_allocation(tbparams[k])
2104            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2105                type="testbed", localname=[k], 
2106                service=[ s.to_topdl() for s in tbparams[k].services]))
2107
2108        # Access proofs for the response message
2109        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2110                    for p in tbparams[k].proof]
2111        exp.updated()
2112        if self.state_filename: 
2113            self.write_state()
2114        self.state_lock.release()
2115        return proofs
2116
2117    def clear_placeholder(self, eid, expid, tmpdir):
2118        """
2119        Clear the placeholder and remove any allocated temporary dir.
2120        """
2121
2122        self.state_lock.acquire()
2123        del self.state[eid]
2124        del self.state[expid]
2125        if self.state_filename: self.write_state()
2126        self.state_lock.release()
2127        if tmpdir and self.cleanup:
2128            self.remove_dirs(tmpdir)
2129
2130    # end of create_experiment sub-functions
2131
2132    def create_experiment(self, req, fid):
2133        """
2134        The external interface to experiment creation called from the
2135        dispatcher.
2136
2137        Creates a working directory, splits the incoming description using the
2138        splitter script and parses out the various subsections using the
2139        classes above.  Once each sub-experiment is created, use pooled threads
2140        to instantiate them and start it all up.
2141        """
2142
2143        self.log.info("Create experiment call started for %s" % fid)
2144        req = req.get('CreateRequestBody', None)
2145        if req:
2146            key = self.get_experiment_key(req)
2147        else:
2148            raise service_error(service_error.req,
2149                    "Bad request format (no CreateRequestBody)")
2150
2151        # Import information from the requester
2152        # import may partially succeed so always save credentials and warn
2153        if not self.auth.import_credentials(data_list=req.get('credential',[])):
2154            self.log.debug("Failed to import delegation credentials(!)")
2155        self.get_grouper_updates(fid)
2156        self.auth.update()
2157        self.auth.save()
2158
2159        try:
2160            # Make sure that the caller can talk to us
2161            proof = self.check_experiment_access(fid, key)
2162        except service_error, e:
2163            self.log.info("Create experiment call failed for %s: access denied"\
2164                    % fid)
2165            raise e
2166
2167
2168        # Install the testbed map entries supplied with the request into a copy
2169        # of the testbed map.
2170        tbmap = dict(self.tbmap)
2171        tbactive = set(self.tbactive)
2172        for m in req.get('testbedmap', []):
2173            if 'testbed' in m and 'uri' in m:
2174                tbmap[m['testbed']] = m['uri']
2175                if 'active' in m and m['active']: tbactive.add(m['testbed'])
2176
2177        # a place to work
2178        try:
2179            tmpdir = tempfile.mkdtemp(prefix="split-")
2180            os.mkdir(tmpdir+"/keys")
2181        except EnvironmentError:
2182            raise service_error(service_error.internal, "Cannot create tmp dir")
2183
2184        tbparams = { }
2185
2186        eid, expid, expcert_file = \
2187                self.get_experiment_ids_and_start(key, tmpdir)
2188
2189        # This catches exceptions to clear the placeholder if necessary
2190        try: 
2191            if not (eid and expid):
2192                raise service_error(service_error.internal, 
2193                        "Cannot find local experiment info!?")
2194
2195            top = self.get_topology(req, tmpdir)
2196            self.confirm_software(top)
2197            # Assign the IPs
2198            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2199            # Find the testbeds to look up
2200            tb_hosts = { }
2201            testbeds = [ ]
2202            for e in top.elements:
2203                if isinstance(e, topdl.Computer):
2204                    tb = e.get_attribute('testbed') or 'default'
2205                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2206                    else: 
2207                        tb_hosts[tb] = [ e.name ]
2208                        testbeds.append(tb)
2209
2210            masters, pmasters = self.get_testbed_services(req, testbeds)
2211            allocated = { }         # Testbeds we can access
2212            topo ={ }               # Sub topologies
2213            connInfo = { }          # Connection information
2214
2215            self.split_topology(top, topo, testbeds)
2216
2217            self.get_access_to_testbeds(testbeds, fid, allocated, 
2218                    tbparams, masters, tbmap, expid, expcert_file)
2219
2220            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2221
2222            part = experiment_partition(self.auth, self.store_url, tbmap,
2223                    self.muxmax, self.direct_transit, tbactive)
2224            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2225                    connInfo, expid)
2226
2227            auth_attrs = set()
2228            # Now get access to the dynamic testbeds (those added above)
2229            for tb in [ t for t in topo if t not in allocated]:
2230                self.get_access(tb, tbparams, fid, masters, tbmap, 
2231                        expid, expcert_file)
2232                allocated[tb] = 1
2233                store_keys = topo[tb].get_attribute('store_keys')
2234                # Give the testbed access to keys it exports or imports
2235                if store_keys:
2236                    auth_attrs.update(set([
2237                        (tbparams[tb].allocID, sk) \
2238                                for sk in store_keys.split(" ")]))
2239
2240            if auth_attrs:
2241                self.append_experiment_authorization(expid, auth_attrs)
2242
2243            # transit and disconnected testbeds may not have a connInfo entry.
2244            # Fill in the blanks.
2245            for t in allocated.keys():
2246                if not connInfo.has_key(t):
2247                    connInfo[t] = { }
2248
2249            self.wrangle_software(expid, top, topo, tbparams)
2250
2251            proofs = self.save_federant_information(allocated, tbparams, 
2252                    eid, top)
2253        except service_error, e:
2254            # If something goes wrong in the parse (usually an access error)
2255            # clear the placeholder state.  From here on out the code delays
2256            # exceptions.  Failing at this point returns a fault to the remote
2257            # caller.
2258
2259            self.log.info("Create experiment call failed for %s %s: %s" % 
2260                    (eid, fid, e))
2261            self.clear_placeholder(eid, expid, tmpdir)
2262            raise e
2263
2264        # Start the background swapper and return the starting state.  From
2265        # here on out, the state will stick around a while.
2266
2267        # Create a logger that logs to the experiment's state object as well as
2268        # to the main log file.
2269        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2270        alloc_collector = self.list_log(self.state[eid].log)
2271        h = logging.StreamHandler(alloc_collector)
2272        # XXX: there should be a global one of these rather than repeating the
2273        # code.
2274        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2275                    '%d %b %y %H:%M:%S'))
2276        alloc_log.addHandler(h)
2277
2278        # Start a thread to do the resource allocation
2279        t  = Thread(target=self.allocate_resources,
2280                args=(allocated, masters, eid, expid, tbparams, 
2281                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2282                    connInfo, tbmap, expcert_file),
2283                name=eid)
2284        t.start()
2285
2286        rv = {
2287                'experimentID': [
2288                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2289                ],
2290                'experimentStatus': 'starting',
2291                'proof': [ proof.to_dict() ] + proofs,
2292            }
2293        self.log.info("Create experiment call succeeded for %s %s" % \
2294                (eid, fid))
2295
2296        return rv
2297   
2298    def get_experiment_fedid(self, key):
2299        """
2300        find the fedid associated with the localname key in the state database.
2301        """
2302
2303        rv = None
2304        self.state_lock.acquire()
2305        if key in self.state:
2306            rv = self.state[key].fedid
2307        self.state_lock.release()
2308        return rv
2309
2310    def check_experiment_access(self, fid, key):
2311        """
2312        Confirm that the fid has access to the experiment.  Though a request
2313        may be made in terms of a local name, the access attribute is always
2314        the experiment's fedid.
2315        """
2316        if not isinstance(key, fedid):
2317            key = self.get_experiment_fedid(key)
2318
2319        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2320
2321        if access_ok:
2322            return proof
2323        else:
2324            raise service_error(service_error.access, "Access Denied",
2325                proof)
2326
2327
2328    def get_handler(self, path, fid):
2329        """
2330        Perhaps surprisingly named, this function handles HTTP GET requests to
2331        this server (SOAP requests are POSTs).
2332        """
2333        self.log.info("Get handler %s %s" % (path, fid))
2334        if len("%s" % fid) == 0:
2335            return (None, None)
2336        # XXX: log proofs?
2337        if self.auth.check_attribute(fid, path):
2338            return ("%s/%s" % (self.repodir, path), "application/binary")
2339        else:
2340            return (None, None)
2341
2342    def update_info(self, key, force=False):
2343        top = None
2344        self.state_lock.acquire()
2345        if key in self.state:
2346            if force or self.state[key].older_than(self.info_cache_limit):
2347                top = self.state[key].top
2348                if top is not None: top = top.clone()
2349                d1, info_params, cert, d2 = \
2350                        self.get_segment_info(self.state[key], need_lock=False)
2351        self.state_lock.release()
2352
2353        if top is None: return
2354
2355        try:
2356            tmpdir = tempfile.mkdtemp(prefix="info-")
2357        except EnvironmentError:
2358            raise service_error(service_error.internal, 
2359                    "Cannot create tmp dir")
2360        cert_file = self.make_temp_certfile(cert, tmpdir)
2361
2362        data = []
2363        try:
2364            for k, (uri, aid) in info_params.items():
2365                info=self.info_segment(log=self.log, testbed=uri,
2366                            cert_file=cert_file, cert_pwd=None,
2367                            trusted_certs=self.trusted_certs,
2368                            caller=self.call_InfoSegment)
2369                info(uri, aid)
2370                data.append(info)
2371        # Clean up the tmpdir no matter what
2372        finally:
2373            if tmpdir: self.remove_dirs(tmpdir)
2374
2375        self.annotate_topology(top, data)
2376        self.state_lock.acquire()
2377        if key in self.state:
2378            self.state[key].top = top
2379            self.state[key].updated()
2380            if self.state_filename: self.write_state()
2381        self.state_lock.release()
2382
2383   
2384    def get_info(self, req, fid):
2385        """
2386        Return all the stored info about this experiment
2387        """
2388        rv = None
2389
2390        self.log.info("Info call started for %s" %  fid)
2391        req = req.get('InfoRequestBody', None)
2392        if not req:
2393            raise service_error(service_error.req,
2394                    "Bad request format (no InfoRequestBody)")
2395        exp = req.get('experiment', None)
2396        legacy = req.get('legacy', False)
2397        fresh = req.get('fresh', False)
2398        if exp:
2399            if exp.has_key('fedid'):
2400                key = exp['fedid']
2401                keytype = "fedid"
2402            elif exp.has_key('localname'):
2403                key = exp['localname']
2404                keytype = "localname"
2405            else:
2406                raise service_error(service_error.req, "Unknown lookup type")
2407        else:
2408            raise service_error(service_error.req, "No request?")
2409
2410        try:
2411            proof = self.check_experiment_access(fid, key)
2412        except service_error, e:
2413            self.log.info("Info call failed for %s: access denied" %  fid)
2414
2415
2416        self.update_info(key, fresh)
2417
2418        self.state_lock.acquire()
2419        if self.state.has_key(key):
2420            rv = self.state[key].get_info()
2421            # Copy the topo if we need legacy annotations
2422            if legacy:
2423                top = self.state[key].top
2424                if top is not None: top = top.clone()
2425        self.state_lock.release()
2426        self.log.info("Gathered Info for %s %s" % (key, fid))
2427
2428        # If the legacy visualization and topology representations are
2429        # requested, calculate them and add them to the return.
2430        if legacy and rv is not None:
2431            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2432            if top is not None:
2433                vtopo = topdl.topology_to_vtopo(top)
2434                if vtopo is not None:
2435                    rv['vtopo'] = vtopo
2436                    try:
2437                        vis = self.genviz(vtopo)
2438                    except service_error, e:
2439                        self.log.debug('Problem generating visualization: %s' \
2440                                % e)
2441                        vis = None
2442                    if vis is not None:
2443                        rv['vis'] = vis
2444        if rv:
2445            self.log.info("Info succeded for %s %s" % (key, fid))
2446            rv['proof'] = proof.to_dict()
2447            return rv
2448        else: 
2449            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2450            raise service_error(service_error.req, "No such experiment")
2451
2452    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2453            results):
2454        """
2455        Call OperateSegment on multiple testbeds and gather the results.
2456        op_params contains the parameters needed to contact that testbed, cert
2457        is a certificate containing the fedid to use, op is the operation,
2458        testbeds is a dict mapping testbed name to targets in that testbed,
2459        params are the parameters to include a,d results is a growing list of
2460        the results of the calls.
2461        """
2462        try:
2463            tmpdir = tempfile.mkdtemp(prefix="info-")
2464        except EnvironmentError:
2465            raise service_error(service_error.internal, 
2466                    "Cannot create tmp dir")
2467        cert_file = self.make_temp_certfile(cert, tmpdir)
2468
2469        try:
2470            for tb, targets in testbeds.items():
2471                if tb in op_params:
2472                    uri, aid = op_params[tb]
2473                    operate=self.operation_segment(log=self.log, testbed=uri,
2474                                cert_file=cert_file, cert_pwd=None,
2475                                trusted_certs=self.trusted_certs,
2476                                caller=self.call_OperationSegment)
2477                    if operate(uri, aid, op, targets, params):
2478                        if operate.status is not None:
2479                            results.extend(operate.status)
2480                            continue
2481                # Something went wrong in a weird way.  Add statuses
2482                # that reflect that to results
2483                for t in targets:
2484                    results.append(operation_status(t, 
2485                        operation_status.federant,
2486                        'Unexpected error on %s' % tb))
2487        # Clean up the tmpdir no matter what
2488        finally:
2489            if tmpdir: self.remove_dirs(tmpdir)
2490
2491    def do_operation(self, req, fid):
2492        """
2493        Find the testbeds holding each target and ask them to carry out the
2494        operation.  Return the statuses.
2495        """
2496        # Map an element to the testbed containing it
2497        def element_to_tb(e):
2498            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2499            elif isinstance(e, topdl.Testbed): return e.name
2500            else: return None
2501        # If d is an operation_status object, make it a dict
2502        def make_dict(d):
2503            if isinstance(d, dict): return d
2504            elif isinstance(d, operation_status): return d.to_dict()
2505            else: return { }
2506
2507        def element_name(e):
2508            if isinstance(e, topdl.Computer): return e.name
2509            elif isinstance(e, topdl.Testbed): 
2510                if e.localname: return e.localname[0]
2511                else: return None
2512            else: return None
2513
2514        self.log.info("Operation call started for %s" %  fid)
2515        req = req.get('OperationRequestBody', None)
2516        if not req:
2517            raise service_error(service_error.req,
2518                    "Bad request format (no OperationRequestBody)")
2519        exp = req.get('experiment', None)
2520        op = req.get('operation', None)
2521        targets = set(req.get('target', []))
2522        params = req.get('parameter', None)
2523
2524        if exp:
2525            if 'fedid' in exp:
2526                key = exp['fedid']
2527                keytype = "fedid"
2528            elif 'localname' in exp:
2529                key = exp['localname']
2530                keytype = "localname"
2531            else:
2532                raise service_error(service_error.req, "Unknown lookup type")
2533        else:
2534            raise service_error(service_error.req, "No request?")
2535
2536        if op is None or not targets:
2537            raise service_error(service_error.req, "No request?")
2538
2539        try:
2540            proof = self.check_experiment_access(fid, key)
2541        except service_error, e:
2542            self.log.info("Operation call failed for %s: access denied" %  fid)
2543            raise e
2544
2545        self.state_lock.acquire()
2546        if key in self.state:
2547            d1, op_params, cert, d2 = \
2548                    self.get_segment_info(self.state[key], need_lock=False,
2549                            key='tb')
2550            top = self.state[key].top
2551            if top is not None:
2552                top = top.clone()
2553        self.state_lock.release()
2554
2555        if top is None:
2556            self.log.info("Operation call failed for %s: not active" %  fid)
2557            raise service_error(service_error.partial, "No topology yet", 
2558                    proof=proof)
2559
2560        testbeds = { }
2561        results = []
2562        for e in top.elements:
2563            ename = element_name(e)
2564            if ename in targets:
2565                tb = element_to_tb(e)
2566                targets.remove(ename)
2567                if tb is not None:
2568                    if tb in testbeds: testbeds[tb].append(ename)
2569                    else: testbeds[tb] = [ ename ]
2570                else:
2571                    results.append(operation_status(e.name, 
2572                        code=operation_status.no_target, 
2573                        description='Cannot map target to testbed'))
2574
2575        for t in targets:
2576            results.append(operation_status(t, operation_status.no_target))
2577
2578        self.operate_on_segments(op_params, cert, op, testbeds, params,
2579                results)
2580
2581        self.log.info("Operation call succeeded for %s" %  fid)
2582        return { 
2583                'experiment': exp, 
2584                'status': [make_dict(r) for r in results],
2585                'proof': proof.to_dict()
2586                }
2587
2588
2589    def get_multi_info(self, req, fid):
2590        """
2591        Return all the stored info that this fedid can access
2592        """
2593        rv = { 'info': [ ], 'proof': [ ] }
2594
2595        self.log.info("Multi Info call started for %s" %  fid)
2596        self.get_grouper_updates(fid)
2597        self.auth.update()
2598        self.auth.save()
2599        self.state_lock.acquire()
2600        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2601            try:
2602                proof = self.check_experiment_access(fid, key)
2603            except service_error, e:
2604                if e.code == service_error.access:
2605                    continue
2606                else:
2607                    self.log.info("Multi Info call failed for %s: %s" %  \
2608                            (e,fid))
2609                    self.state_lock.release()
2610                    raise e
2611
2612            if self.state.has_key(key):
2613                e = self.state[key].get_info()
2614                e['proof'] = proof.to_dict()
2615                rv['info'].append(e)
2616                rv['proof'].append(proof.to_dict())
2617        self.state_lock.release()
2618        self.log.info("Multi Info call succeeded for %s" %  fid)
2619        return rv
2620
2621    def check_termination_status(self, fed_exp, force):
2622        """
2623        Confirm that the experiment is sin a valid state to stop (or force it)
2624        return the state - invalid states for deletion and force settings cause
2625        exceptions.
2626        """
2627        self.state_lock.acquire()
2628        status = fed_exp.status
2629
2630        if status:
2631            if status in ('starting', 'terminating'):
2632                if not force:
2633                    self.state_lock.release()
2634                    raise service_error(service_error.partial, 
2635                            'Experiment still being created or destroyed')
2636                else:
2637                    self.log.warning('Experiment in %s state ' % status + \
2638                            'being terminated by force.')
2639            self.state_lock.release()
2640            return status
2641        else:
2642            # No status??? trouble
2643            self.state_lock.release()
2644            raise service_error(service_error.internal,
2645                    "Experiment has no status!?")
2646
2647    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2648        ids = []
2649        term_params = { }
2650        if need_lock: self.state_lock.acquire()
2651        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2652        expcert = fed_exp.identity
2653        repo = "%s" % fed_exp.fedid
2654
2655        # Collect the allocation/segment ids into a dict keyed by the fedid
2656        # of the allocation that contains a tuple of uri, aid
2657        for i, fed in enumerate(fed_exp.get_all_allocations()):
2658            uri = fed.uri
2659            aid = fed.allocID
2660            if key == 'aid': term_params[aid] = (uri, aid)
2661            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2662
2663        if need_lock: self.state_lock.release()
2664        return ids, term_params, expcert, repo
2665
2666
2667    def get_termination_info(self, fed_exp):
2668        self.state_lock.acquire()
2669        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2670        # Change the experiment state
2671        fed_exp.status = 'terminating'
2672        fed_exp.updated()
2673        if self.state_filename: self.write_state()
2674        self.state_lock.release()
2675
2676        return ids, term_params, expcert, repo
2677
2678
2679    def deallocate_resources(self, term_params, expcert, status, force, 
2680            dealloc_log):
2681        tmpdir = None
2682        # This try block makes sure the tempdir is cleared
2683        try:
2684            # If no expcert, try the deallocation as the experiment
2685            # controller instance.
2686            if expcert and self.auth_type != 'legacy': 
2687                try:
2688                    tmpdir = tempfile.mkdtemp(prefix="term-")
2689                except EnvironmentError:
2690                    raise service_error(service_error.internal, 
2691                            "Cannot create tmp dir")
2692                cert_file = self.make_temp_certfile(expcert, tmpdir)
2693                pw = None
2694            else: 
2695                cert_file = self.cert_file
2696                pw = self.cert_pwd
2697
2698            # Stop everyone.  NB, wait_for_all waits until a thread starts
2699            # and then completes, so we can't wait if nothing starts.  So,
2700            # no tbparams, no start.
2701            if len(term_params) > 0:
2702                tp = thread_pool(self.nthreads)
2703                for k, (uri, aid) in term_params.items():
2704                    # Create and start a thread to stop the segment
2705                    tp.wait_for_slot()
2706                    t  = pooled_thread(\
2707                            target=self.terminate_segment(log=dealloc_log,
2708                                testbed=uri,
2709                                cert_file=cert_file, 
2710                                cert_pwd=pw,
2711                                trusted_certs=self.trusted_certs,
2712                                caller=self.call_TerminateSegment),
2713                            args=(uri, aid), name=k,
2714                            pdata=tp, trace_file=self.trace_file)
2715                    t.start()
2716                # Wait for completions
2717                tp.wait_for_all_done()
2718
2719            # release the allocations (failed experiments have done this
2720            # already, and starting experiments may be in odd states, so we
2721            # ignore errors releasing those allocations
2722            try: 
2723                for k, (uri, aid)  in term_params.items():
2724                    self.release_access(None, aid, uri=uri,
2725                            cert_file=cert_file, cert_pwd=pw)
2726            except service_error, e:
2727                if status != 'failed' and not force:
2728                    raise e
2729
2730        # Clean up the tmpdir no matter what
2731        finally:
2732            if tmpdir: self.remove_dirs(tmpdir)
2733
2734    def terminate_experiment(self, req, fid):
2735        """
2736        Swap this experiment out on the federants and delete the shared
2737        information
2738        """
2739        self.log.info("Terminate experiment call started for %s" % fid)
2740        tbparams = { }
2741        req = req.get('TerminateRequestBody', None)
2742        if not req:
2743            raise service_error(service_error.req,
2744                    "Bad request format (no TerminateRequestBody)")
2745
2746        key = self.get_experiment_key(req, 'experiment')
2747        try:
2748            proof = self.check_experiment_access(fid, key)
2749        except service_error, e:
2750            self.log.info(
2751                    "Terminate experiment call failed for %s: access denied" \
2752                            % fid)
2753            raise e
2754        exp = req.get('experiment', False)
2755        force = req.get('force', False)
2756
2757        dealloc_list = [ ]
2758
2759
2760        # Create a logger that logs to the dealloc_list as well as to the main
2761        # log file.
2762        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2763        dealloc_log.info("Terminating %s " %key)
2764        h = logging.StreamHandler(self.list_log(dealloc_list))
2765        # XXX: there should be a global one of these rather than repeating the
2766        # code.
2767        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2768                    '%d %b %y %H:%M:%S'))
2769        dealloc_log.addHandler(h)
2770
2771        self.state_lock.acquire()
2772        fed_exp = self.state.get(key, None)
2773        self.state_lock.release()
2774        repo = None
2775
2776        if fed_exp:
2777            status = self.check_termination_status(fed_exp, force)
2778            # get_termination_info updates the experiment state
2779            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2780            self.deallocate_resources(term_params, expcert, status, force, 
2781                    dealloc_log)
2782
2783            # Remove the terminated experiment
2784            self.state_lock.acquire()
2785            for id in ids:
2786                self.clear_experiment_authorization(id, need_state_lock=False)
2787                if id in self.state: del self.state[id]
2788
2789            if self.state_filename: self.write_state()
2790            self.state_lock.release()
2791
2792            # Delete any synch points associated with this experiment.  All
2793            # synch points begin with the fedid of the experiment.
2794            fedid_keys = set(["fedid:%s" % f for f in ids \
2795                    if isinstance(f, fedid)])
2796            for k in self.synch_store.all_keys():
2797                try:
2798                    if len(k) > 45 and k[0:46] in fedid_keys:
2799                        self.synch_store.del_value(k)
2800                except synch_store.BadDeletionError:
2801                    pass
2802            self.write_store()
2803
2804            # Remove software and other cached stuff from the filesystem.
2805            if repo:
2806                self.remove_dirs("%s/%s" % (self.repodir, repo))
2807       
2808            self.log.info("Terminate experiment succeeded for %s %s" % \
2809                    (key, fid))
2810            return { 
2811                    'experiment': exp , 
2812                    'deallocationLog': string.join(dealloc_list, ''),
2813                    'proof': [proof.to_dict()],
2814                    }
2815        else:
2816            self.log.info("Terminate experiment failed for %s %s: no state" % \
2817                    (key, fid))
2818            raise service_error(service_error.req, "No saved state")
2819
2820
2821    def GetValue(self, req, fid):
2822        """
2823        Get a value from the synchronized store
2824        """
2825        req = req.get('GetValueRequestBody', None)
2826        if not req:
2827            raise service_error(service_error.req,
2828                    "Bad request format (no GetValueRequestBody)")
2829       
2830        name = req.get('name', None)
2831        wait = req.get('wait', False)
2832        rv = { 'name': name }
2833
2834        if not name:
2835            raise service_error(service_error.req, "No name?")
2836
2837        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2838
2839        if access_ok:
2840            self.log.debug("[GetValue] asking for %s " % name)
2841            try:
2842                v = self.synch_store.get_value(name, wait)
2843            except synch_store.RevokedKeyError:
2844                # No more synch on this key
2845                raise service_error(service_error.federant, 
2846                        "Synch key %s revoked" % name)
2847            if v is not None:
2848                rv['value'] = v
2849            rv['proof'] = proof.to_dict()
2850            self.log.debug("[GetValue] got %s from %s" % (v, name))
2851            return rv
2852        else:
2853            raise service_error(service_error.access, "Access Denied",
2854                    proof=proof)
2855       
2856
2857    def SetValue(self, req, fid):
2858        """
2859        Set a value in the synchronized store
2860        """
2861        req = req.get('SetValueRequestBody', None)
2862        if not req:
2863            raise service_error(service_error.req,
2864                    "Bad request format (no SetValueRequestBody)")
2865       
2866        name = req.get('name', None)
2867        v = req.get('value', '')
2868
2869        if not name:
2870            raise service_error(service_error.req, "No name?")
2871
2872        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2873
2874        if access_ok:
2875            try:
2876                self.synch_store.set_value(name, v)
2877                self.write_store()
2878                self.log.debug("[SetValue] set %s to %s" % (name, v))
2879            except synch_store.CollisionError:
2880                # Translate into a service_error
2881                raise service_error(service_error.req,
2882                        "Value already set: %s" %name)
2883            except synch_store.RevokedKeyError:
2884                # No more synch on this key
2885                raise service_error(service_error.federant, 
2886                        "Synch key %s revoked" % name)
2887                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2888        else:
2889            raise service_error(service_error.access, "Access Denied",
2890                    proof=proof)
Note: See TracBrowser for help on using the repository browser.