source: fedd/federation/experiment_control.py @ eac54fa

compt_changes
Last change on this file since eac54fa was eac54fa, checked in by Ted Faber <faber@…>, 12 years ago

Remote debugging

  • Property mode set to 100644
File size: 95.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15import shutil
16import zipfile
17
18import os.path
19
20import traceback
21# For parsing visualization output and splitter output
22import xml.parsers.expat
23
24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
26from string import join
27
28from urlparse import urlparse
29from urllib2 import urlopen, URLError
30
31from util import *
32from deter import fedid, generate_fedid
33from remote_service import xmlrpc_handler, soap_handler, service_caller
34from service_error import service_error
35from synch_store import synch_store
36from experiment_partition import experiment_partition
37from experiment_control_legacy import experiment_control_legacy
38from authorizer import abac_authorizer
39from thread_pool import thread_pool, pooled_thread
40from experiment_info import experiment_info, allocation_info, federated_service
41from operation_status import operation_status
42
43from deter import topdl
44from deter import ip_allocator
45from deter import ip_addr
46import list_log
47
48
49class nullHandler(logging.Handler):
50    def emit(self, record): pass
51
52fl = logging.getLogger("fedd.experiment_control")
53fl.addHandler(nullHandler())
54
55class experiment_control_local(experiment_control_legacy):
56    """
57    Control of experiments that this system can directly access.
58
59    Includes experiment creation, termination and information dissemination.
60    Thred safe.
61    """
62
63    class ssh_cmd_timeout(RuntimeError): pass
64   
65    call_RequestAccess = service_caller('RequestAccess')
66    call_ReleaseAccess = service_caller('ReleaseAccess')
67    call_StartSegment = service_caller('StartSegment')
68    call_TerminateSegment = service_caller('TerminateSegment')
69    call_InfoSegment = service_caller('InfoSegment')
70    call_OperationSegment = service_caller('OperationSegment')
71    call_Ns2Topdl = service_caller('Ns2Topdl')
72
73    def __init__(self, config=None, auth=None):
74        """
75        Intialize the various attributes, most from the config object
76        """
77
78        def parse_tarfile_list(tf):
79            """
80            Parse a tarfile list from the configuration.  This is a set of
81            paths and tarfiles separated by spaces.
82            """
83            rv = [ ]
84            if tf is not None:
85                tl = tf.split()
86                while len(tl) > 1:
87                    p, t = tl[0:2]
88                    del tl[0:2]
89                    rv.append((p, t))
90            return rv
91
92        self.list_log = list_log.list_log
93
94        self.cert_file = config.get("experiment_control", "cert_file")
95        if self.cert_file:
96            self.cert_pwd = config.get("experiment_control", "cert_pwd")
97        else:
98            self.cert_file = config.get("globals", "cert_file")
99            self.cert_pwd = config.get("globals", "cert_pwd")
100
101        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
102                or config.get("globals", "trusted_certs")
103
104        self.repodir = config.get("experiment_control", "repodir")
105        self.repo_url = config.get("experiment_control", "repo_url", 
106                "https://users.isi.deterlab.net:23235");
107
108        self.exp_stem = "fed-stem"
109        self.log = logging.getLogger("fedd.experiment_control")
110        set_log_level(config, "experiment_control", self.log)
111        self.muxmax = 2
112        self.nthreads = 10
113        self.randomize_experiments = False
114
115        self.splitter = None
116        self.ssh_keygen = "/usr/bin/ssh-keygen"
117        self.ssh_identity_file = None
118
119
120        self.debug = config.getboolean("experiment_control", "create_debug")
121        self.cleanup = not config.getboolean("experiment_control", 
122                "leave_tmpfiles")
123        self.state_filename = config.get("experiment_control", 
124                "experiment_state")
125        self.store_filename = config.get("experiment_control", 
126                "synch_store")
127        self.store_url = config.get("experiment_control", "store_url")
128        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
129        self.grouper_url = config.get("experiment_control", "grouper_url")
130        self.fedkit = parse_tarfile_list(\
131                config.get("experiment_control", "fedkit"))
132        self.gatewaykit = parse_tarfile_list(\
133                config.get("experiment_control", "gatewaykit"))
134
135        dt = config.get("experiment_control", "direct_transit")
136        self.auth_type = config.get('experiment_control', 'auth_type') \
137                or 'legacy'
138        self.auth_dir = config.get('experiment_control', 'auth_dir')
139        # XXX: document this!
140        self.info_cache_limit = \
141                config.getint('experiment_control', 'info_cache', 600)
142        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
143        else: self.direct_transit = [ ]
144        # NB for internal master/slave ops, not experiment setup
145        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
146
147        self.overrides = set([])
148        ovr = config.get('experiment_control', 'overrides')
149        if ovr:
150            for o in ovr.split(","):
151                o = o.strip()
152                if o.startswith('fedid:'): o = o[len('fedid:'):]
153                self.overrides.add(fedid(hexstr=o))
154
155        self.state = { }
156        self.state_lock = Lock()
157        self.tclsh = "/usr/local/bin/otclsh"
158        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
159                config.get("experiment_control", "tcl_splitter",
160                        "/usr/testbed/lib/ns2ir/parse.tcl")
161        mapdb_file = config.get("experiment_control", "mapdb")
162        self.trace_file = sys.stderr
163
164        self.def_expstart = \
165                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
166                "/tmp/federate";
167        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
168                "FEDDIR/hosts";
169        self.def_gwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
171                "/tmp/bridge.log";
172        self.def_mgwstart = \
173                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
174                "/tmp/bridge.log";
175        self.def_gwimage = "FBSD61-TUNNEL2";
176        self.def_gwtype = "pc";
177        self.local_access = { }
178
179        if self.auth_type == 'legacy':
180            if auth:
181                self.auth = auth
182            else:
183                self.log.error( "[access]: No authorizer initialized, " +\
184                        "creating local one.")
185                auth = authorizer()
186            self.get_access = self.legacy_get_access
187        elif self.auth_type == 'abac':
188            self.auth = abac_authorizer(load=self.auth_dir, 
189                    update=os.path.join(self.auth_dir,'update'))
190        else:
191            raise service_error(service_error.internal, 
192                    "Unknown auth_type: %s" % self.auth_type)
193
194        if mapdb_file:
195            self.read_mapdb(mapdb_file)
196        else:
197            self.log.warn("[experiment_control] No testbed map, using defaults")
198            self.tbmap = { 
199                    'deter':'https://users.isi.deterlab.net:23235',
200                    'emulab':'https://users.isi.deterlab.net:23236',
201                    'ucb':'https://users.isi.deterlab.net:23237',
202                    }
203
204        # Grab saved state.  OK to do this w/o locking because it's read only
205        # and only one thread should be in existence that can see self.state at
206        # this point.
207        if self.state_filename:
208            self.read_state()
209
210        if self.store_filename:
211            self.read_store()
212        else:
213            self.log.warning("No saved synch store")
214            self.synch_store = synch_store
215
216        # Dispatch tables
217        self.soap_services = {\
218                'New': soap_handler('New', self.new_experiment),
219                'Create': soap_handler('Create', self.create_experiment),
220                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
221                'Vis': soap_handler('Vis', self.get_vis),
222                'Info': soap_handler('Info', self.get_info),
223                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
224                'Operation': soap_handler('Operation', self.do_operation),
225                'Terminate': soap_handler('Terminate', 
226                    self.terminate_experiment),
227                'GetValue': soap_handler('GetValue', self.GetValue),
228                'SetValue': soap_handler('SetValue', self.SetValue),
229        }
230
231        self.xmlrpc_services = {\
232                'New': xmlrpc_handler('New', self.new_experiment),
233                'Create': xmlrpc_handler('Create', self.create_experiment),
234                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
235                'Vis': xmlrpc_handler('Vis', self.get_vis),
236                'Info': xmlrpc_handler('Info', self.get_info),
237                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
238                'Terminate': xmlrpc_handler('Terminate',
239                    self.terminate_experiment),
240                'Operation': xmlrpc_handler('Operation', self.do_operation),
241                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
242                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
243        }
244
245    # Call while holding self.state_lock
246    def write_state(self):
247        """
248        Write a new copy of experiment state after copying the existing state
249        to a backup.
250
251        State format is a simple pickling of the state dictionary.
252        """
253        if os.access(self.state_filename, os.W_OK):
254            copy_file(self.state_filename, \
255                    "%s.bak" % self.state_filename)
256        try:
257            f = open(self.state_filename, 'w')
258            pickle.dump(self.state, f)
259        except EnvironmentError, e:
260            self.log.error("Can't write file %s: %s" % \
261                    (self.state_filename, e))
262        except pickle.PicklingError, e:
263            self.log.error("Pickling problem: %s" % e)
264        except TypeError, e:
265            self.log.error("Pickling problem (TypeError): %s" % e)
266
267    @staticmethod
268    def get_alloc_ids(exp):
269        """
270        Used by read_store and read state.  This used to be worse.
271        """
272
273        return [ a.allocID for a in exp.get_all_allocations() ]
274
275
276    # Call while holding self.state_lock
277    def read_state(self):
278        """
279        Read a new copy of experiment state.  Old state is overwritten.
280
281        State format is a simple pickling of the state dictionary.
282        """
283       
284        try:
285            f = open(self.state_filename, "r")
286            self.state = pickle.load(f)
287            self.log.debug("[read_state]: Read state from %s" % \
288                    self.state_filename)
289        except EnvironmentError, e:
290            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
291                    % (self.state_filename, e))
292        except pickle.UnpicklingError, e:
293            self.log.warning(("[read_state]: No saved state: " + \
294                    "Unpickling failed: %s") % e)
295       
296        for s in self.state.values():
297            try:
298
299                eid = s.fedid
300                if eid : 
301                    if self.auth_type == 'legacy':
302                        # XXX: legacy
303                        # Give the owner rights to the experiment
304                        #self.auth.set_attribute(s['owner'], eid)
305                        # And holders of the eid as well
306                        self.auth.set_attribute(eid, eid)
307                        # allow overrides to control experiments as well
308                        for o in self.overrides:
309                            self.auth.set_attribute(o, eid)
310                        # Set permissions to allow reading of the software
311                        # repo, if any, as well.
312                        for a in self.get_alloc_ids(s):
313                            self.auth.set_attribute(a, 'repo/%s' % eid)
314                else:
315                    raise KeyError("No experiment id")
316            except KeyError, e:
317                self.log.warning("[read_state]: State ownership or identity " +\
318                        "misformatted in %s: %s" % (self.state_filename, e))
319
320    def read_mapdb(self, file):
321        """
322        Read a simple colon separated list of mappings for the
323        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
324        also adds testbeds to active if they include , active after
325        their name.
326        """
327
328        self.tbmap = { }
329        lineno =0
330        try:
331            f = open(file, "r")
332            for line in f:
333                lineno += 1
334                line = line.strip()
335                if line.startswith('#') or len(line) == 0:
336                    continue
337                try:
338                    label, url = line.split(':', 1)
339                    self.tbmap[label] = url
340                except ValueError, e:
341                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
342                            "map db: %s %s" % (lineno, line, e))
343        except EnvironmentError, e:
344            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
345                    "open %s: %s" % (file, e))
346        else:
347            f.close()
348
349    def read_store(self):
350        try:
351            self.synch_store = synch_store()
352            self.synch_store.load(self.store_filename)
353            self.log.debug("[read_store]: Read store from %s" % \
354                    self.store_filename)
355        except EnvironmentError, e:
356            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
357                    % (self.state_filename, e))
358            self.synch_store = synch_store()
359
360        # Set the initial permissions on data in the store.  XXX: This ad hoc
361        # authorization attribute initialization is getting out of hand.
362        # XXX: legacy
363        if self.auth_type == 'legacy':
364            for k in self.synch_store.all_keys():
365                try:
366                    if k.startswith('fedid:'):
367                        fid = fedid(hexstr=k[6:46])
368                        if self.state.has_key(fid):
369                            for a in self.get_alloc_ids(self.state[fid]):
370                                self.auth.set_attribute(a, k)
371                except ValueError, e:
372                    self.log.warn("Cannot deduce permissions for %s" % k)
373
374
375    def write_store(self):
376        """
377        Write a new copy of synch_store after writing current state
378        to a backup.  We use the internal synch_store pickle method to avoid
379        incinsistent data.
380
381        State format is a simple pickling of the store.
382        """
383        if os.access(self.store_filename, os.W_OK):
384            copy_file(self.store_filename, \
385                    "%s.bak" % self.store_filename)
386        try:
387            self.synch_store.save(self.store_filename)
388        except EnvironmentError, e:
389            self.log.error("Can't write file %s: %s" % \
390                    (self.store_filename, e))
391        except TypeError, e:
392            self.log.error("Pickling problem (TypeError): %s" % e)
393
394    # XXX this may belong somewhere else
395
396    def get_grouper_updates(self, fid):
397        if self.grouper_url is None: return
398        d = tempfile.mkdtemp()
399        try:
400            fstr = "%s" % fid
401            # XXX locking
402            zipname = os.path.join(d, 'grouper.zip')
403            dest = os.path.join(self.auth_dir, 'update')
404            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
405            f = open(zipname, 'w')
406            f.write(resp.read())
407            f.close()
408            zf = zipfile.ZipFile(zipname, 'r')
409            zf.extractall(dest)
410            zf.close()
411        except URLError, e:
412            self.log.error("Cannot connect to grouper: %s" % e)
413            pass
414        finally:
415            shutil.rmtree(d)
416
417
418    def remove_dirs(self, dir):
419        """
420        Remove the directory tree and all files rooted at dir.  Log any errors,
421        but continue.
422        """
423        self.log.debug("[removedirs]: removing %s" % dir)
424        try:
425            for path, dirs, files in os.walk(dir, topdown=False):
426                for f in files:
427                    os.remove(os.path.join(path, f))
428                for d in dirs:
429                    os.rmdir(os.path.join(path, d))
430            os.rmdir(dir)
431        except EnvironmentError, e:
432            self.log.error("Error deleting directory tree in %s" % e);
433
434    @staticmethod
435    def make_temp_certfile(expcert, tmpdir):
436        """
437        make a protected copy of the access certificate so the experiment
438        controller can act as the experiment principal.  mkstemp is the most
439        secure way to do that. The directory should be created by
440        mkdtemp.  Return the filename.
441        """
442        if expcert and tmpdir:
443            try:
444                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
445                f = os.fdopen(certf, 'w')
446                print >> f, expcert
447                f.close()
448            except EnvironmentError, e:
449                raise service_error(service_error.internal, 
450                        "Cannot create temp cert file?")
451            return certfn
452        else:
453            return None
454
455       
456    def generate_ssh_keys(self, dest, type="rsa" ):
457        """
458        Generate a set of keys for the gateways to use to talk.
459
460        Keys are of type type and are stored in the required dest file.
461        """
462        valid_types = ("rsa", "dsa")
463        t = type.lower();
464        if t not in valid_types: raise ValueError
465        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
466
467        try:
468            trace = open("/dev/null", "w")
469        except EnvironmentError:
470            raise service_error(service_error.internal,
471                    "Cannot open /dev/null??");
472
473        # May raise CalledProcessError
474        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
475        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
476        if rv != 0:
477            raise service_error(service_error.internal, 
478                    "Cannot generate nonce ssh keys.  %s return code %d" \
479                            % (self.ssh_keygen, rv))
480
481    def generate_seer_certs(self, destdir):
482        '''
483        Create a SEER ca cert and a node cert in destdir/ca.pem and
484        destdir/node.pem respectively.  These will be distributed throughout
485        the federated experiment.  This routine reports errors via
486        service_errors.
487        '''
488        openssl = '/usr/bin/openssl'
489        # All the filenames and parameters we need for openssl calls below
490        ca_key =os.path.join(destdir, 'ca.key') 
491        ca_pem = os.path.join(destdir, 'ca.pem')
492        node_key =os.path.join(destdir, 'node.key') 
493        node_pem = os.path.join(destdir, 'node.pem')
494        node_req = os.path.join(destdir, 'node.req')
495        node_signed = os.path.join(destdir, 'node.signed')
496        days = '%s' % (365 * 10)
497        serial = '%s' % random.randint(0, 1<<16)
498
499        try:
500            # Sequence of calls to create a CA key, create a ca cert, create a
501            # node key, node signing request, and finally a signed node
502            # certificate.
503            sequence = (
504                    (openssl, 'genrsa', '-out', ca_key, '1024'),
505                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
506                        ca_pem, '-days', days, '-subj', 
507                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
508                    (openssl, 'genrsa', '-out', node_key, '1024'),
509                    (openssl, 'req', '-new', '-key', node_key, '-out', 
510                        node_req, '-days', days, '-subj', 
511                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
512                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
513                        '-set_serial', serial, '-req', '-in', node_req, 
514                        '-out', node_signed, '-days', days),
515                )
516            # Do all that stuff; bail if there's an error, and push all the
517            # output to dev/null.
518            for cmd in sequence:
519                trace = open("/dev/null", "w")
520                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
521                if rv != 0:
522                    raise service_error(service_error.internal, 
523                            "Cannot generate SEER certs.  %s return code %d" \
524                                    % (' '.join(cmd), rv))
525            # Concatinate the node key and signed certificate into node.pem
526            f = open(node_pem, 'w')
527            for comp in (node_signed, node_key):
528                g = open(comp, 'r')
529                f.write(g.read())
530                g.close()
531            f.close()
532
533            # Throw out intermediaries.
534            for fn in (ca_key, node_key, node_req, node_signed):
535                os.unlink(fn)
536
537        except EnvironmentError, e:
538            # Any difficulties with the file system wind up here
539            raise service_error(service_error.internal,
540                    "File error on  %s while creating SEER certs: %s" % \
541                            (e.filename, e.strerror))
542
543
544
545    def gentopo(self, str):
546        """
547        Generate the topology data structure from the splitter's XML
548        representation of it.
549
550        The topology XML looks like:
551            <experiment>
552                <nodes>
553                    <node><vname></vname><ips>ip1:ip2</ips></node>
554                </nodes>
555                <lans>
556                    <lan>
557                        <vname></vname><vnode></vnode><ip></ip>
558                        <bandwidth></bandwidth><member>node:port</member>
559                    </lan>
560                </lans>
561        """
562        class topo_parse:
563            """
564            Parse the topology XML and create the dats structure.
565            """
566            def __init__(self):
567                # Typing of the subelements for data conversion
568                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
569                self.int_subelements = ( 'bandwidth',)
570                self.float_subelements = ( 'delay',)
571                # The final data structure
572                self.nodes = [ ]
573                self.lans =  [ ]
574                self.topo = { \
575                        'node': self.nodes,\
576                        'lan' : self.lans,\
577                    }
578                self.element = { }  # Current element being created
579                self.chars = ""     # Last text seen
580
581            def end_element(self, name):
582                # After each sub element the contents is added to the current
583                # element or to the appropriate list.
584                if name == 'node':
585                    self.nodes.append(self.element)
586                    self.element = { }
587                elif name == 'lan':
588                    self.lans.append(self.element)
589                    self.element = { }
590                elif name in self.str_subelements:
591                    self.element[name] = self.chars
592                    self.chars = ""
593                elif name in self.int_subelements:
594                    self.element[name] = int(self.chars)
595                    self.chars = ""
596                elif name in self.float_subelements:
597                    self.element[name] = float(self.chars)
598                    self.chars = ""
599
600            def found_chars(self, data):
601                self.chars += data.rstrip()
602
603
604        tp = topo_parse();
605        parser = xml.parsers.expat.ParserCreate()
606        parser.EndElementHandler = tp.end_element
607        parser.CharacterDataHandler = tp.found_chars
608
609        parser.Parse(str)
610
611        return tp.topo
612       
613
614    def genviz(self, topo):
615        """
616        Generate the visualization the virtual topology
617        """
618
619        neato = "/usr/local/bin/neato"
620        # These are used to parse neato output and to create the visualization
621        # file.
622        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
623        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
624                "%s</type></node>"
625
626        try:
627            # Node names
628            nodes = [ n['vname'] for n in topo['node'] ]
629            topo_lans = topo['lan']
630        except KeyError, e:
631            raise service_error(service_error.internal, "Bad topology: %s" %e)
632
633        lans = { }
634        links = { }
635
636        # Walk through the virtual topology, organizing the connections into
637        # 2-node connections (links) and more-than-2-node connections (lans).
638        # When a lan is created, it's added to the list of nodes (there's a
639        # node in the visualization for the lan).
640        for l in topo_lans:
641            if links.has_key(l['vname']):
642                if len(links[l['vname']]) < 2:
643                    links[l['vname']].append(l['vnode'])
644                else:
645                    nodes.append(l['vname'])
646                    lans[l['vname']] = links[l['vname']]
647                    del links[l['vname']]
648                    lans[l['vname']].append(l['vnode'])
649            elif lans.has_key(l['vname']):
650                lans[l['vname']].append(l['vnode'])
651            else:
652                links[l['vname']] = [ l['vnode'] ]
653
654
655        # Open up a temporary file for dot to turn into a visualization
656        try:
657            df, dotname = tempfile.mkstemp()
658            dotfile = os.fdopen(df, 'w')
659        except EnvironmentError:
660            raise service_error(service_error.internal,
661                    "Failed to open file in genviz")
662
663        try:
664            dnull = open('/dev/null', 'w')
665        except EnvironmentError:
666            service_error(service_error.internal,
667                    "Failed to open /dev/null in genviz")
668
669        # Generate a dot/neato input file from the links, nodes and lans
670        try:
671            print >>dotfile, "graph G {"
672            for n in nodes:
673                print >>dotfile, '\t"%s"' % n
674            for l in links.keys():
675                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
676            for l in lans.keys():
677                for n in lans[l]:
678                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
679            print >>dotfile, "}"
680            dotfile.close()
681        except TypeError:
682            raise service_error(service_error.internal,
683                    "Single endpoint link in vtopo")
684        except EnvironmentError:
685            raise service_error(service_error.internal, "Cannot write dot file")
686
687        # Use dot to create a visualization
688        try:
689            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
690                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
691                stderr=dnull, close_fds=True)
692        except EnvironmentError:
693            raise service_error(service_error.internal, 
694                    "Cannot generate visualization: is graphviz available?")
695        dnull.close()
696
697        # Translate dot to vis format
698        vis_nodes = [ ]
699        vis = { 'node': vis_nodes }
700        for line in dot.stdout:
701            m = vis_re.match(line)
702            if m:
703                vn = m.group(1)
704                vis_node = {'name': vn, \
705                        'x': float(m.group(2)),\
706                        'y' : float(m.group(3)),\
707                    }
708                if vn in links.keys() or vn in lans.keys():
709                    vis_node['type'] = 'lan'
710                else:
711                    vis_node['type'] = 'node'
712                vis_nodes.append(vis_node)
713        rv = dot.wait()
714
715        os.remove(dotname)
716        # XXX: graphviz seems to use low return codes for warnings, like
717        # "couldn't find font"
718        if rv < 2 : return vis
719        else: return None
720
721
722    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
723            cert_pwd=None):
724        """
725        Release access to testbed through fedd
726        """
727
728        if not uri and tbmap:
729            uri = tbmap.get(tb, None)
730        if not uri:
731            raise service_error(service_error.server_config, 
732                    "Unknown testbed: %s" % tb)
733
734        if self.local_access.has_key(uri):
735            resp = self.local_access[uri].ReleaseAccess(\
736                    { 'ReleaseAccessRequestBody' : 
737                        {'allocID': {'fedid': aid}},}, 
738                    fedid(file=cert_file))
739            resp = { 'ReleaseAccessResponseBody': resp } 
740        else:
741            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
742                    cert_file, cert_pwd, self.trusted_certs)
743
744        # better error coding
745
746    def remote_ns2topdl(self, uri, desc):
747
748        req = {
749                'description' : { 'ns2description': desc },
750            }
751
752        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
753                self.trusted_certs)
754
755        if r.has_key('Ns2TopdlResponseBody'):
756            r = r['Ns2TopdlResponseBody']
757            ed = r.get('experimentdescription', None)
758            if ed.has_key('topdldescription'):
759                return topdl.Topology(**ed['topdldescription'])
760            else:
761                raise service_error(service_error.protocol, 
762                        "Bad splitter response (no output)")
763        else:
764            raise service_error(service_error.protocol, "Bad splitter response")
765
766    class start_segment:
767        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
768                cert_pwd=None, trusted_certs=None, caller=None,
769                log_collector=None):
770            self.log = log
771            self.debug = debug
772            self.cert_file = cert_file
773            self.cert_pwd = cert_pwd
774            self.trusted_certs = None
775            self.caller = caller
776            self.testbed = testbed
777            self.log_collector = log_collector
778            self.response = None
779            self.node = { }
780            self.subs = { }
781            self.tb = { }
782            self.proof = None
783
784        def make_map(self, resp):
785            if 'segmentdescription' not in resp  or \
786                    'topdldescription' not in resp['segmentdescription']:
787                self.log.warn('No topology returned from startsegment')
788                return 
789
790            top = topdl.Topology(
791                    **resp['segmentdescription']['topdldescription'])
792
793            for e in top.elements:
794                if isinstance(e, topdl.Computer):
795                    self.node[e.name] = e
796                elif isinstance(e, topdl.Testbed):
797                    self.tb[e.uri] = e
798            for s in top.substrates:
799                self.subs[s.name] = s
800
801        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
802            req = {
803                    'allocID': { 'fedid' : aid }, 
804                    'segmentdescription': { 
805                        'topdldescription': topo.to_dict(),
806                    },
807                }
808
809            if connInfo:
810                req['connection'] = connInfo
811
812            import_svcs = [ s for m in masters.values() \
813                    for s in m if self.testbed in s.importers]
814
815            if import_svcs or self.testbed in masters:
816                req['service'] = []
817
818            for s in import_svcs:
819                for r in s.reqs:
820                    sr = copy.deepcopy(r)
821                    sr['visibility'] = 'import';
822                    req['service'].append(sr)
823
824            for s in masters.get(self.testbed, []):
825                for r in s.reqs:
826                    sr = copy.deepcopy(r)
827                    sr['visibility'] = 'export';
828                    req['service'].append(sr)
829
830            if attrs:
831                req['fedAttr'] = attrs
832
833            try:
834                self.log.debug("Calling StartSegment at %s " % uri)
835                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
836                        self.trusted_certs)
837                if r.has_key('StartSegmentResponseBody'):
838                    lval = r['StartSegmentResponseBody'].get('allocationLog',
839                            None)
840                    if lval and self.log_collector:
841                        for line in  lval.splitlines(True):
842                            self.log_collector.write(line)
843                    self.make_map(r['StartSegmentResponseBody'])
844                    if 'proof' in r: self.proof = r['proof']
845                    self.response = r
846                else:
847                    raise service_error(service_error.internal, 
848                            "Bad response!?: %s" %r)
849                return True
850            except service_error, e:
851                self.log.error("Start segment failed on %s: %s" % \
852                        (self.testbed, e))
853                return False
854
855
856
857    class terminate_segment:
858        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
859                cert_pwd=None, trusted_certs=None, caller=None):
860            self.log = log
861            self.debug = debug
862            self.cert_file = cert_file
863            self.cert_pwd = cert_pwd
864            self.trusted_certs = None
865            self.caller = caller
866            self.testbed = testbed
867
868        def __call__(self, uri, aid ):
869            req = {
870                    'allocID': {'fedid': aid }, 
871                }
872            self.log.info("Calling terminate segment")
873            try:
874                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
875                        self.trusted_certs)
876                self.log.info("Terminate segment succeeded")
877                return True
878            except service_error, e:
879                self.log.error("Terminate segment failed on %s: %s" % \
880                        (self.testbed, e))
881                return False
882
883    class info_segment(start_segment):
884        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
885                cert_pwd=None, trusted_certs=None, caller=None,
886                log_collector=None):
887            experiment_control_local.start_segment.__init__(self, debug, 
888                    log, testbed, cert_file, cert_pwd, trusted_certs, 
889                    caller, log_collector)
890
891        def __call__(self, uri, aid):
892            req = { 'allocID': { 'fedid' : aid } }
893
894            try:
895                self.log.debug("Calling InfoSegment at %s " % uri)
896                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
897                        self.trusted_certs)
898                if r.has_key('InfoSegmentResponseBody'):
899                    self.make_map(r['InfoSegmentResponseBody'])
900                    if 'proof' in r: self.proof = r['proof']
901                    self.response = r
902                else:
903                    raise service_error(service_error.internal, 
904                            "Bad response!?: %s" %r)
905                return True
906            except service_error, e:
907                self.log.error("Info segment failed on %s: %s" % \
908                        (self.testbed, e))
909                return False
910
911    class operation_segment:
912        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
913                cert_pwd=None, trusted_certs=None, caller=None,
914                log_collector=None):
915            self.log = log
916            self.debug = debug
917            self.cert_file = cert_file
918            self.cert_pwd = cert_pwd
919            self.trusted_certs = None
920            self.caller = caller
921            self.testbed = testbed
922            self.status = None
923
924        def __call__(self, uri, aid, op, targets, params):
925            req = { 
926                    'allocID': { 'fedid' : aid },
927                    'operation': op,
928                    'target': targets,
929                    }
930            if params: req['parameter'] = params
931
932
933            try:
934                self.log.debug("Calling OperationSegment at %s " % uri)
935                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
936                        self.trusted_certs)
937                if 'OperationSegmentResponseBody' in r:
938                    r = r['OperationSegmentResponseBody']
939                    if 'status' in r:
940                        self.status = r['status']
941                else:
942                    raise service_error(service_error.internal, 
943                            "Bad response!?: %s" %r)
944                return True
945            except service_error, e:
946                self.log.error("Operation segment failed on %s: %s" % \
947                        (self.testbed, e))
948                return False
949
950    def annotate_topology(self, top, data):
951        # These routines do various parts of the annotation
952        def add_new_names(nl, l):
953            """ add any names in nl to the list in l """
954            for n in nl:
955                if n not in l: l.append(n)
956       
957        def merge_services(ne, e):
958            for ns in ne.service:
959                # NB: the else is on the for
960                for s in e.service:
961                    if ns.name == s.name:
962                        s.importer = ns.importer
963                        s.param = ns.param
964                        s.description = ns.description
965                        s.status = ns.status
966                        break
967                else:
968                    e.service.append(ns)
969       
970        def merge_oses(ne, e):
971            """
972            Merge the operating system entries of ne into e
973            """
974            for nos in ne.os:
975                # NB: the else is on the for
976                for os in e.os:
977                    if nos.name == os.name:
978                        os.version = nos.version
979                        os.version = nos.distribution
980                        os.version = nos.distributionversion
981                        for a in nos.attribute:
982                            if os.get_attribute(a.attribute):
983                                os.remove_attribute(a.attribute)
984                            os.set_attribute(a.attribute, a.value)
985                        break
986                else:
987                    # If both nodes have one OS, this is a replacement
988                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
989                    else: e.os.append(nos)
990
991        # Annotate the topology with embedding info
992        for e in top.elements:
993            if isinstance(e, topdl.Computer):
994                for s in data:
995                    ne = s.node.get(e.name, None)
996                    if ne is not None:
997                        add_new_names(ne.localname, e.localname)
998                        e.status = ne.status
999                        merge_services(ne, e)
1000                        add_new_names(ne.operation, e.operation)
1001                        if ne.os: merge_oses(ne, e)
1002                        break
1003            elif isinstance(e,topdl.Testbed):
1004                for s in data:
1005                    ne = s.tb.get(e.uri, None)
1006                    if ne is not None:
1007                        add_new_names(ne.localname, e.localname)
1008                        add_new_names(ne.operation, e.operation)
1009                        merge_services(ne, e)
1010                        for a in ne.attribute:
1011                            e.set_attribute(a.attribute, a.value)
1012        # Annotate substrates
1013        for s in top.substrates:
1014            for d in data:
1015                ss = d.subs.get(s.name, None)
1016                if ss is not None:
1017                    if ss.capacity is not None:
1018                        s.capacity = ss.capacity
1019                    if s.latency is not None:
1020                        s.latency = ss.latency
1021
1022
1023
1024    def allocate_resources(self, allocated, masters, eid, expid, 
1025            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1026            attrs=None, connInfo={}, tbmap=None, expcert=None):
1027
1028        started = { }           # Testbeds where a sub-experiment started
1029                                # successfully
1030
1031        # XXX
1032        fail_soft = False
1033
1034        if tbmap is None: tbmap = { }
1035
1036        log = alloc_log or self.log
1037
1038        tp = thread_pool(self.nthreads)
1039        threads = [ ]
1040        starters = [ ]
1041
1042        if expcert:
1043            cert = expcert
1044            pw = None
1045        else:
1046            cert = self.cert_file
1047            pw = self.cert_pwd
1048
1049        for tb in allocated.keys():
1050            # Create and start a thread to start the segment, and save it
1051            # to get the return value later
1052            tb_attrs = copy.copy(attrs)
1053            tp.wait_for_slot()
1054            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1055            base, suffix = split_testbed(tb)
1056            if suffix:
1057                tb_attrs.append({'attribute': 'experiment_name', 
1058                    'value': "%s-%s" % (eid, suffix)})
1059            else:
1060                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1061            if not uri:
1062                raise service_error(service_error.internal, 
1063                        "Unknown testbed %s !?" % tb)
1064
1065            aid = tbparams[tb].allocID
1066            if not aid:
1067                raise service_error(service_error.internal, 
1068                        "No alloc id for testbed %s !?" % tb)
1069
1070            s = self.start_segment(log=log, debug=self.debug,
1071                    testbed=tb, cert_file=cert,
1072                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1073                    caller=self.call_StartSegment,
1074                    log_collector=log_collector)
1075            starters.append(s)
1076            t  = pooled_thread(\
1077                    target=s, name=tb,
1078                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1079                    pdata=tp, trace_file=self.trace_file)
1080            threads.append(t)
1081            t.start()
1082
1083        # Wait until all finish (keep pinging the log, though)
1084        mins = 0
1085        revoked = False
1086        while not tp.wait_for_all_done(60.0):
1087            mins += 1
1088            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1089                    % mins)
1090            if not revoked and \
1091                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1092                # a testbed has failed.  Revoke this experiment's
1093                # synchronizarion values so that sub experiments will not
1094                # deadlock waiting for synchronization that will never happen
1095                self.log.info("A subexperiment has failed to swap in, " + \
1096                        "revoking synch keys")
1097                var_key = "fedid:%s" % expid
1098                for k in self.synch_store.all_keys():
1099                    if len(k) > 45 and k[0:46] == var_key:
1100                        self.synch_store.revoke_key(k)
1101                revoked = True
1102
1103        failed = [ t.getName() for t in threads if not t.rv ]
1104        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1105
1106        # If one failed clean up, unless fail_soft is set
1107        if failed:
1108            if not fail_soft:
1109                tp.clear()
1110                for tb in succeeded:
1111                    # Create and start a thread to stop the segment
1112                    tp.wait_for_slot()
1113                    uri = tbparams[tb].uri
1114                    t  = pooled_thread(\
1115                            target=self.terminate_segment(log=log,
1116                                testbed=tb,
1117                                cert_file=cert, 
1118                                cert_pwd=pw,
1119                                trusted_certs=self.trusted_certs,
1120                                caller=self.call_TerminateSegment),
1121                            args=(uri, tbparams[tb].allocID),
1122                            name=tb,
1123                            pdata=tp, trace_file=self.trace_file)
1124                    t.start()
1125                # Wait until all finish (if any are being stopped)
1126                if succeeded:
1127                    tp.wait_for_all_done()
1128
1129                # release the allocations
1130                for tb in tbparams.keys():
1131                    try:
1132                        self.release_access(tb, tbparams[tb].allocID, 
1133                                tbmap=tbmap, uri=tbparams[tb].uri,
1134                                cert_file=cert, cert_pwd=pw)
1135                    except service_error, e:
1136                        self.log.warn("Error releasing access: %s" % e.desc)
1137                # Remove the placeholder
1138                self.state_lock.acquire()
1139                self.state[eid].status = 'failed'
1140                self.state[eid].updated()
1141                if self.state_filename: self.write_state()
1142                self.state_lock.release()
1143                # Remove the repo dir
1144                self.remove_dirs("%s/%s" %(self.repodir, expid))
1145                # Walk up tmpdir, deleting as we go
1146                if self.cleanup:
1147                    self.remove_dirs(tmpdir)
1148                else:
1149                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1150
1151
1152                log.error("Swap in failed on %s" % ",".join(failed))
1153                return
1154        else:
1155            # Walk through the successes and gather the proofs
1156            proofs = { }
1157            for s in starters:
1158                if s.proof:
1159                    proofs[s.testbed] = s.proof
1160            self.annotate_topology(top, starters)
1161            log.info("[start_segment]: Experiment %s active" % eid)
1162
1163
1164        # Walk up tmpdir, deleting as we go
1165        if self.cleanup:
1166            self.remove_dirs(tmpdir)
1167        else:
1168            log.debug("[start_experiment]: not removing %s" % tmpdir)
1169
1170        # Insert the experiment into our state and update the disk copy.
1171        self.state_lock.acquire()
1172        self.state[expid].status = 'active'
1173        self.state[eid] = self.state[expid]
1174        self.state[eid].top = top
1175        self.state[eid].updated()
1176        # Append startup proofs
1177        for f in self.state[eid].get_all_allocations():
1178            if f.tb in proofs:
1179                f.proof.append(proofs[f.tb])
1180
1181        if self.state_filename: self.write_state()
1182        self.state_lock.release()
1183        return
1184
1185
1186    def add_kit(self, e, kit):
1187        """
1188        Add a Software object created from the list of (install, location)
1189        tuples passed as kit  to the software attribute of an object e.  We
1190        do this enough to break out the code, but it's kind of a hack to
1191        avoid changing the old tuple rep.
1192        """
1193
1194        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1195
1196        if isinstance(e.software, list): e.software.extend(s)
1197        else: e.software = s
1198
1199    def append_experiment_authorization(self, expid, attrs, 
1200            need_state_lock=True):
1201        """
1202        Append the authorization information to system state
1203        """
1204
1205        for p, a in attrs:
1206            self.auth.set_attribute(p, a)
1207        self.auth.save()
1208
1209        if need_state_lock: self.state_lock.acquire()
1210        # XXX: really a no op?
1211        #self.state[expid]['auth'].update(attrs)
1212        if self.state_filename: self.write_state()
1213        if need_state_lock: self.state_lock.release()
1214
1215    def clear_experiment_authorization(self, expid, need_state_lock=True):
1216        """
1217        Attrs is a set of attribute principal pairs that need to be removed
1218        from the authenticator.  Remove them and save the authenticator.
1219        """
1220
1221        if need_state_lock: self.state_lock.acquire()
1222        # XXX: should be a no-op
1223        #if expid in self.state and 'auth' in self.state[expid]:
1224            #for p, a in self.state[expid]['auth']:
1225                #self.auth.unset_attribute(p, a)
1226            #self.state[expid]['auth'] = set()
1227        if self.state_filename: self.write_state()
1228        if need_state_lock: self.state_lock.release()
1229        self.auth.save()
1230
1231
1232    def create_experiment_state(self, fid, req, expid, expcert,
1233            state='starting'):
1234        """
1235        Create the initial entry in the experiment's state.  The expid and
1236        expcert are the experiment's fedid and certifacte that represents that
1237        ID, which are installed in the experiment state.  If the request
1238        includes a suggested local name that is used if possible.  If the local
1239        name is already taken by an experiment owned by this user that has
1240        failed, it is overwritten.  Otherwise new letters are added until a
1241        valid localname is found.  The generated local name is returned.
1242        """
1243
1244        if req.has_key('experimentID') and \
1245                req['experimentID'].has_key('localname'):
1246            overwrite = False
1247            eid = req['experimentID']['localname']
1248            # If there's an old failed experiment here with the same local name
1249            # and accessible by this user, we'll overwrite it, otherwise we'll
1250            # fall through and do the collision avoidance.
1251            old_expid = self.get_experiment_fedid(eid)
1252            if old_expid:
1253                users_experiment = True
1254                try:
1255                    self.check_experiment_access(fid, old_expid)
1256                except service_error, e:
1257                    if e.code == service_error.access: users_experiment = False
1258                    else: raise e
1259                if users_experiment:
1260                    self.state_lock.acquire()
1261                    status = self.state[eid].status
1262                    if status and status == 'failed':
1263                        # remove the old access attributes
1264                        self.clear_experiment_authorization(eid,
1265                                need_state_lock=False)
1266                        overwrite = True
1267                        del self.state[eid]
1268                        del self.state[old_expid]
1269                    self.state_lock.release()
1270                else:
1271                    self.log.info('Experiment %s exists, ' % eid + \
1272                            'but this user cannot access it')
1273            self.state_lock.acquire()
1274            while (self.state.has_key(eid) and not overwrite):
1275                eid += random.choice(string.ascii_letters)
1276            # Initial state
1277            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1278                    identity=expcert)
1279            self.state[expid] = self.state[eid]
1280            if self.state_filename: self.write_state()
1281            self.state_lock.release()
1282        else:
1283            eid = self.exp_stem
1284            for i in range(0,5):
1285                eid += random.choice(string.ascii_letters)
1286            self.state_lock.acquire()
1287            while (self.state.has_key(eid)):
1288                eid = self.exp_stem
1289                for i in range(0,5):
1290                    eid += random.choice(string.ascii_letters)
1291            # Initial state
1292            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1293                    identity=expcert)
1294            self.state[expid] = self.state[eid]
1295            if self.state_filename: self.write_state()
1296            self.state_lock.release()
1297
1298        # Let users touch the state.  Authorize this fid and the expid itself
1299        # to touch the experiment, as well as allowing th eoverrides.
1300        self.append_experiment_authorization(eid, 
1301                set([(fid, expid), (expid,expid)] + \
1302                        [ (o, expid) for o in self.overrides]))
1303
1304        return eid
1305
1306
1307    def allocate_ips_to_topo(self, top):
1308        """
1309        Add an ip4_address attribute to all the hosts in the topology, based on
1310        the shared substrates on which they sit.  An /etc/hosts file is also
1311        created and returned as a list of hostfiles entries.  We also return
1312        the allocator, because we may need to allocate IPs to portals
1313        (specifically DRAGON portals).
1314        """
1315        subs = sorted(top.substrates, 
1316                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1317                reverse=True)
1318        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1319        ifs = { }
1320        hosts = [ ]
1321
1322        for idx, s in enumerate(subs):
1323            net_size = len(s.interfaces)+2
1324
1325            a = ips.allocate(net_size)
1326            if a :
1327                base, num = a
1328                if num < net_size: 
1329                    raise service_error(service_error.internal,
1330                            "Allocator returned wrong number of IPs??")
1331            else:
1332                raise service_error(service_error.req, 
1333                        "Cannot allocate IP addresses")
1334            mask = ips.min_alloc
1335            while mask < net_size:
1336                mask *= 2
1337
1338            netmask = ((2**32-1) ^ (mask-1))
1339
1340            base += 1
1341            for i in s.interfaces:
1342                i.attribute.append(
1343                        topdl.Attribute('ip4_address', 
1344                            "%s" % ip_addr(base)))
1345                i.attribute.append(
1346                        topdl.Attribute('ip4_netmask', 
1347                            "%s" % ip_addr(int(netmask))))
1348
1349                hname = i.element.name
1350                if ifs.has_key(hname):
1351                    hosts.append("%s\t%s-%s %s-%d" % \
1352                            (ip_addr(base), hname, s.name, hname,
1353                                ifs[hname]))
1354                else:
1355                    ifs[hname] = 0
1356                    hosts.append("%s\t%s-%s %s-%d %s" % \
1357                            (ip_addr(base), hname, s.name, hname,
1358                                ifs[hname], hname))
1359
1360                ifs[hname] += 1
1361                base += 1
1362        return hosts, ips
1363
1364    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1365            tbparam, masters, tbmap, expid=None, expcert=None):
1366        for tb in testbeds:
1367            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1368                    expcert)
1369            allocated[tb] = 1
1370
1371    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1372            expcert=None):
1373        """
1374        Get access to testbed through fedd and set the parameters for that tb
1375        """
1376        def get_export_project(svcs):
1377            """
1378            Look through for the list of federated_service for this testbed
1379            objects for a project_export service, and extract the project
1380            parameter.
1381            """
1382
1383            pe = [s for s in svcs if s.name=='project_export']
1384            if len(pe) == 1:
1385                return pe[0].params.get('project', None)
1386            elif len(pe) == 0:
1387                return None
1388            else:
1389                raise service_error(service_error.req,
1390                        "More than one project export is not supported")
1391
1392        def add_services(svcs, type, slist, keys):
1393            """
1394            Add the given services to slist.  type is import or export.  Also
1395            add a mapping entry from the assigned id to the original service
1396            record.
1397            """
1398            for i, s in enumerate(svcs):
1399                idx = '%s%d' % (type, i)
1400                keys[idx] = s
1401                sr = {'id': idx, 'name': s.name, 'visibility': type }
1402                if s.params:
1403                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1404                            for k, v in s.params.items()]
1405                slist.append(sr)
1406
1407        uri = tbmap.get(testbed_base(tb), None)
1408        if not uri:
1409            raise service_error(service_error.server_config, 
1410                    "Unknown testbed: %s" % tb)
1411
1412        export_svcs = masters.get(tb,[])
1413        import_svcs = [ s for m in masters.values() \
1414                for s in m \
1415                    if tb in s.importers ]
1416
1417        export_project = get_export_project(export_svcs)
1418        # Compose the credential list so that IDs come before attributes
1419        creds = set()
1420        keys = set()
1421        certs = self.auth.get_creds_for_principal(fid)
1422        # Append credenials about this experiment controller - e.g. that it is
1423        # trusted.
1424        certs.update(self.auth.get_creds_for_principal(
1425            fedid(file=self.cert_file)))
1426        if expid:
1427            certs.update(self.auth.get_creds_for_principal(expid))
1428        for c in certs:
1429            keys.add(c.issuer_cert())
1430            creds.add(c.attribute_cert())
1431        creds = list(keys) + list(creds)
1432
1433        if expcert: cert, pw = expcert, None
1434        else: cert, pw = self.cert_file, self.cert_pw
1435
1436        # Request credentials
1437        req = {
1438                'abac_credential': creds,
1439            }
1440        # Make the service request from the services we're importing and
1441        # exporting.  Keep track of the export request ids so we can
1442        # collect the resulting info from the access response.
1443        e_keys = { }
1444        if import_svcs or export_svcs:
1445            slist = []
1446            add_services(import_svcs, 'import', slist, e_keys)
1447            add_services(export_svcs, 'export', slist, e_keys)
1448            req['service'] = slist
1449
1450        if self.local_access.has_key(uri):
1451            # Local access call
1452            req = { 'RequestAccessRequestBody' : req }
1453            r = self.local_access[uri].RequestAccess(req, 
1454                    fedid(file=self.cert_file))
1455            r = { 'RequestAccessResponseBody' : r }
1456        else:
1457            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1458
1459        if r.has_key('RequestAccessResponseBody'):
1460            # Through to here we have a valid response, not a fault.
1461            # Access denied is a fault, so something better or worse than
1462            # access denied has happened.
1463            r = r['RequestAccessResponseBody']
1464            self.log.debug("[get_access] Access granted")
1465        else:
1466            raise service_error(service_error.protocol,
1467                        "Bad proxy response")
1468        if 'proof' not in r:
1469            raise service_error(service_error.protocol,
1470                        "Bad access response (no access proof)")
1471
1472        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1473                tb=tb, uri=uri, proof=[r['proof']], 
1474                services=masters.get(tb, None))
1475
1476        # Collect the responses corresponding to the services this testbed
1477        # exports.  These will be the service requests that we will include in
1478        # the start segment requests (with appropriate visibility values) to
1479        # import and export the segments.
1480        for s in r.get('service', []):
1481            id = s.get('id', None)
1482            # Note that this attaches the response to the object in the masters
1483            # data structure.  (The e_keys index disappears when this fcn
1484            # returns)
1485            if id and id in e_keys:
1486                e_keys[id].reqs.append(s)
1487
1488        # Add attributes to parameter space.  We don't allow attributes to
1489        # overlay any parameters already installed.
1490        for a in r.get('fedAttr', []):
1491            try:
1492                if a['attribute']:
1493                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1494            except KeyError:
1495                self.log.error("Bad attribute in response: %s" % a)
1496
1497
1498    def split_topology(self, top, topo, testbeds):
1499        """
1500        Create the sub-topologies that are needed for experiment instantiation.
1501        """
1502        for tb in testbeds:
1503            topo[tb] = top.clone()
1504            # copy in for loop allows deletions from the original
1505            for e in [ e for e in topo[tb].elements]:
1506                etb = e.get_attribute('testbed')
1507                # NB: elements without a testbed attribute won't appear in any
1508                # sub topologies. 
1509                if not etb or etb != tb:
1510                    for i in e.interface:
1511                        for s in i.subs:
1512                            try:
1513                                s.interfaces.remove(i)
1514                            except ValueError:
1515                                raise service_error(service_error.internal,
1516                                        "Can't remove interface??")
1517                    topo[tb].elements.remove(e)
1518            topo[tb].make_indices()
1519
1520    def confirm_software(self, top):
1521        """
1522        Make sure that the software to be loaded in the topo is all available
1523        before we begin making access requests, etc.  This is a subset of
1524        wrangle_software.
1525        """
1526        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1527        pkgs.update([x.location for e in top.elements for x in e.software])
1528
1529        for pkg in pkgs:
1530            loc = pkg
1531
1532            scheme, host, path = urlparse(loc)[0:3]
1533            dest = os.path.basename(path)
1534            if not scheme:
1535                if not loc.startswith('/'):
1536                    loc = "/%s" % loc
1537                loc = "file://%s" %loc
1538            # NB: if scheme was found, loc == pkg
1539            try:
1540                u = urlopen(loc)
1541                u.close()
1542            except Exception, e:
1543                raise service_error(service_error.req, 
1544                        "Cannot open %s: %s" % (loc, e))
1545        return True
1546
1547    def wrangle_software(self, expid, top, topo, tbparams):
1548        """
1549        Copy software out to the repository directory, allocate permissions and
1550        rewrite the segment topologies to look for the software in local
1551        places.
1552        """
1553
1554        # XXX: PNNL debug
1555        self.log.debug("calling wrangle software")
1556        # Copy the rpms and tarfiles to a distribution directory from
1557        # which the federants can retrieve them
1558        linkpath = "%s/software" %  expid
1559        softdir ="%s/%s" % ( self.repodir, linkpath)
1560        softmap = { }
1561
1562        # self.fedkit and self.gateway kit are lists of tuples of
1563        # (install_location, download_location) this extracts the download
1564        # locations.
1565        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1566        pkgs.update([x.location for e in top.elements for x in e.software])
1567        try:
1568            # XXX: PNNL debug
1569            self.log.debug("wrangle software: makedirs")
1570            os.makedirs(softdir)
1571            # XXX: PNNL debug
1572            self.log.debug("wrangle software: makedirs success")
1573        except EnvironmentError, e:
1574            raise service_error(
1575                    "Cannot create software directory: %s" % e)
1576        # The actual copying.  Everything's converted into a url for copying.
1577        auth_attrs = set()
1578        for pkg in pkgs:
1579            loc = pkg
1580
1581            scheme, host, path = urlparse(loc)[0:3]
1582            dest = os.path.basename(path)
1583            if not scheme:
1584                if not loc.startswith('/'):
1585                    loc = "/%s" % loc
1586                loc = "file://%s" %loc
1587            # NB: if scheme was found, loc == pkg
1588            try:
1589                u = urlopen(loc)
1590            except Exception, e:
1591                raise service_error(service_error.req, 
1592                        "Cannot open %s: %s" % (loc, e))
1593            try:
1594                f = open("%s/%s" % (softdir, dest) , "w")
1595                self.log.debug("Writing %s/%s" % (softdir,dest) )
1596                data = u.read(4096)
1597                while data:
1598                    f.write(data)
1599                    data = u.read(4096)
1600                f.close()
1601                u.close()
1602            except Exception, e:
1603                raise service_error(service_error.internal,
1604                        "Could not copy %s: %s" % (loc, e))
1605            path = re.sub("/tmp", "", linkpath)
1606            # XXX
1607            softmap[pkg] = \
1608                    "%s/%s/%s" %\
1609                    ( self.repo_url, path, dest)
1610
1611            # Allow the individual segments to access the software by assigning
1612            # an attribute to each testbed allocation that encodes the data to
1613            # be released.  This expression collects the data for each run of
1614            # the loop.
1615            auth_attrs.update([
1616                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1617                        for tb in tbparams.keys()])
1618
1619        self.append_experiment_authorization(expid, auth_attrs)
1620
1621        # Convert the software locations in the segments into the local
1622        # copies on this host
1623        for soft in [ s for tb in topo.values() \
1624                for e in tb.elements \
1625                    if getattr(e, 'software', False) \
1626                        for s in e.software ]:
1627            if softmap.has_key(soft.location):
1628                soft.location = softmap[soft.location]
1629
1630
1631    def new_experiment(self, req, fid):
1632        """
1633        The external interface to empty initial experiment creation called from
1634        the dispatcher.
1635
1636        Creates a working directory, splits the incoming description using the
1637        splitter script and parses out the avrious subsections using the
1638        lcasses above.  Once each sub-experiment is created, use pooled threads
1639        to instantiate them and start it all up.
1640        """
1641        self.log.info("New experiment call started for %s" % fid)
1642        req = req.get('NewRequestBody', None)
1643        if not req:
1644            raise service_error(service_error.req,
1645                    "Bad request format (no NewRequestBody)")
1646
1647        # import may partially succeed so always save credentials and warn
1648        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1649            self.log.debug("Failed to import delegation credentials(!)")
1650        self.get_grouper_updates(fid)
1651        self.auth.update()
1652        self.auth.save()
1653
1654        try:
1655            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1656                    with_proof=True)
1657        except service_error, e:
1658            self.log.info("New experiment call for %s: access denied" % fid)
1659            raise e
1660
1661
1662        if not access_ok:
1663            self.log.info("New experiment call for %s: Access denied" % fid)
1664            raise service_error(service_error.access, "New access denied",
1665                    proof=[proof])
1666
1667        try:
1668            tmpdir = tempfile.mkdtemp(prefix="split-")
1669        except EnvironmentError:
1670            raise service_error(service_error.internal, "Cannot create tmp dir")
1671
1672        # Generate an ID for the experiment (slice) and a certificate that the
1673        # allocator can use to prove they own it.  We'll ship it back through
1674        # the encrypted connection.  If the requester supplied one, use it.
1675        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1676            expcert = req['experimentAccess']['X509']
1677            expid = fedid(certstr=expcert)
1678            self.state_lock.acquire()
1679            if expid in self.state:
1680                self.state_lock.release()
1681                raise service_error(service_error.req, 
1682                        'fedid %s identifies an existing experiment' % expid)
1683            self.state_lock.release()
1684        else:
1685            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1686
1687        #now we're done with the tmpdir, and it should be empty
1688        if self.cleanup:
1689            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1690            os.rmdir(tmpdir)
1691        else:
1692            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1693
1694        eid = self.create_experiment_state(fid, req, expid, expcert, 
1695                state='empty')
1696
1697        rv = {
1698                'experimentID': [
1699                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1700                ],
1701                'experimentStatus': 'empty',
1702                'experimentAccess': { 'X509' : expcert },
1703                'proof': proof.to_dict(),
1704            }
1705
1706        self.log.info("New experiment call succeeded for %s" % fid)
1707        return rv
1708
1709    # create_experiment sub-functions
1710
1711    @staticmethod
1712    def get_experiment_key(req, field='experimentID'):
1713        """
1714        Parse the experiment identifiers out of the request (the request body
1715        tag has been removed).  Specifically this pulls either the fedid or the
1716        localname out of the experimentID field.  A fedid is preferred.  If
1717        neither is present or the request does not contain the fields,
1718        service_errors are raised.
1719        """
1720        # Get the experiment access
1721        exp = req.get(field, None)
1722        if exp:
1723            if exp.has_key('fedid'):
1724                key = exp['fedid']
1725            elif exp.has_key('localname'):
1726                key = exp['localname']
1727            else:
1728                raise service_error(service_error.req, "Unknown lookup type")
1729        else:
1730            raise service_error(service_error.req, "No request?")
1731
1732        return key
1733
1734    def get_experiment_ids_and_start(self, key, tmpdir):
1735        """
1736        Get the experiment name, id and access certificate from the state, and
1737        set the experiment state to 'starting'.  returns a triple (fedid,
1738        localname, access_cert_file). The access_cert_file is a copy of the
1739        contents of the access certificate, created in the tempdir with
1740        restricted permissions.  If things are confused, raise an exception.
1741        """
1742
1743        expid = eid = None
1744        self.state_lock.acquire()
1745        if key in self.state:
1746            exp = self.state[key]
1747            exp.status = "starting"
1748            exp.updated()
1749            expid = exp.fedid
1750            eid = exp.localname
1751            expcert = exp.identity
1752        self.state_lock.release()
1753
1754        # make a protected copy of the access certificate so the experiment
1755        # controller can act as the experiment principal.
1756        if expcert:
1757            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1758            if not expcert_file:
1759                raise service_error(service_error.internal, 
1760                        "Cannot create temp cert file?")
1761        else:
1762            expcert_file = None
1763
1764        return (eid, expid, expcert_file)
1765
1766    def get_topology(self, req, tmpdir):
1767        """
1768        Get the ns2 content and put it into a file for parsing.  Call the local
1769        or remote parser and return the topdl.Topology.  Errors result in
1770        exceptions.  req is the request and tmpdir is a work directory.
1771        """
1772
1773        # The tcl parser needs to read a file so put the content into that file
1774        descr=req.get('experimentdescription', None)
1775        if descr:
1776            if 'ns2description' in descr:
1777                file_content=descr['ns2description']
1778            elif 'topdldescription' in descr:
1779                return topdl.Topology(**descr['topdldescription'])
1780            else:
1781                raise service_error(service_error.req, 
1782                        'Unknown experiment description type')
1783        else:
1784            raise service_error(service_error.req, "No experiment description")
1785
1786
1787        if self.splitter_url:
1788            self.log.debug("Calling remote topdl translator at %s" % \
1789                    self.splitter_url)
1790            top = self.remote_ns2topdl(self.splitter_url, file_content)
1791        else:
1792            tclfile = os.path.join(tmpdir, "experiment.tcl")
1793            if file_content:
1794                try:
1795                    f = open(tclfile, 'w')
1796                    f.write(file_content)
1797                    f.close()
1798                except EnvironmentError:
1799                    raise service_error(service_error.internal,
1800                            "Cannot write temp experiment description")
1801            else:
1802                raise service_error(service_error.req, 
1803                        "Only ns2descriptions supported")
1804            pid = "dummy"
1805            gid = "dummy"
1806            eid = "dummy"
1807
1808            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1809                str(self.muxmax), '-m', 'dummy']
1810
1811            tclcmd.extend([pid, gid, eid, tclfile])
1812
1813            self.log.debug("running local splitter %s", " ".join(tclcmd))
1814            # This is just fantastic.  As a side effect the parser copies
1815            # tb_compat.tcl into the current directory, so that directory
1816            # must be writable by the fedd user.  Doing this in the
1817            # temporary subdir ensures this is the case.
1818            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1819                    cwd=tmpdir)
1820            split_data = tclparser.stdout
1821
1822            top = topdl.topology_from_xml(file=split_data, top="experiment")
1823            os.remove(tclfile)
1824
1825        return top
1826
1827    def get_testbed_services(self, req, testbeds):
1828        """
1829        Parse the services section of the request into two dicts mapping
1830        testbed to lists of federated_service objects.  The first dict maps all
1831        exporters of services to those service objects, the second maps
1832        testbeds to service objects only for services requiring portals.
1833        """
1834
1835        # Sanity check the services.  Exports or imports from unknown testbeds
1836        # cause an exception.
1837        for s in req.get('service', []):
1838            for t in s.get('import', []):
1839                if t not in testbeds:
1840                    raise service_error(service_error.req, 
1841                            'Service import to unknown testbed: %s' %t)
1842            for t in s.get('export', []):
1843                if t not in testbeds:
1844                    raise service_error(service_error.req, 
1845                            'Service export from unknown testbed: %s' %t)
1846
1847
1848        # We construct both dicts here because deriving the second is more
1849        # complex than it looks - both the keys and lists can differ, and it's
1850        # much easier to generate both in one pass.
1851        masters = { }
1852        pmasters = { }
1853        for s in req.get('service', []):
1854            # If this is a service request with the importall field
1855            # set, fill it out.
1856
1857            if s.get('importall', False):
1858                s['import'] = [ tb for tb in testbeds \
1859                        if tb not in s.get('export',[])]
1860                del s['importall']
1861
1862            # Add the service to masters
1863            for tb in s.get('export', []):
1864                if s.get('name', None):
1865
1866                    params = { }
1867                    for a in s.get('fedAttr', []):
1868                        params[a.get('attribute', '')] = a.get('value','')
1869
1870                    fser = federated_service(name=s['name'],
1871                            exporter=tb, importers=s.get('import',[]),
1872                            params=params)
1873                    if fser.name == 'hide_hosts' \
1874                            and 'hosts' not in fser.params:
1875                        fser.params['hosts'] = \
1876                                ",".join(tb_hosts.get(fser.exporter, []))
1877                    if tb in masters: masters[tb].append(fser)
1878                    else: masters[tb] = [fser]
1879
1880                    if fser.portal:
1881                        if tb in pmasters: pmasters[tb].append(fser)
1882                        else: pmasters[tb] = [fser]
1883                else:
1884                    self.log.error('Testbed service does not have name " + \
1885                            "and importers')
1886        return masters, pmasters
1887
1888    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1889        """
1890        Create the ssh keys necessary for interconnecting the portal nodes and
1891        the global hosts file for letting each segment know about the IP
1892        addresses in play.  Save these into the repo.  Add attributes to the
1893        autorizer allowing access controllers to download them and return a set
1894        of attributes that inform the segments where to find this stuff.  May
1895        raise service_errors in if there are problems.
1896        """
1897        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1898        gw_secretkey_base = "fed.%s" % self.ssh_type
1899        keydir = os.path.join(tmpdir, 'keys')
1900        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1901        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1902
1903        try:
1904            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1905        except ValueError:
1906            raise service_error(service_error.server_config, 
1907                    "Bad key type (%s)" % self.ssh_type)
1908
1909        self.generate_seer_certs(keydir)
1910
1911        # Copy configuration files into the remote file store
1912        # The config urlpath
1913        configpath = "/%s/config" % expid
1914        # The config file system location
1915        configdir ="%s%s" % ( self.repodir, configpath)
1916        try:
1917            os.makedirs(configdir)
1918        except EnvironmentError, e:
1919            raise service_error(service_error.internal,
1920                    "Cannot create config directory: %s" % e)
1921        try:
1922            f = open("%s/hosts" % configdir, "w")
1923            print >> f, string.join(hosts, '\n')
1924            f.close()
1925        except EnvironmentError, e:
1926            raise service_error(service_error.internal, 
1927                    "Cannot write hosts file: %s" % e)
1928        try:
1929            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1930            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1931            copy_file(os.path.join(keydir, 'ca.pem'), 
1932                    os.path.join(configdir, 'ca.pem'))
1933            copy_file(os.path.join(keydir, 'node.pem'), 
1934                    os.path.join(configdir, 'node.pem'))
1935        except EnvironmentError, e:
1936            raise service_error(service_error.internal, 
1937                    "Cannot copy keyfiles: %s" % e)
1938
1939        # Allow the individual testbeds to access the configuration files,
1940        # again by setting an attribute for the relevant pathnames on each
1941        # allocation principal.  Yeah, that's a long list comprehension.
1942        self.append_experiment_authorization(expid, set([
1943            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1944                    for tb in tbparams.keys() \
1945                        for f in ("hosts", 'ca.pem', 'node.pem', 
1946                            gw_secretkey_base, gw_pubkey_base)]))
1947
1948        attrs = [ 
1949                {
1950                    'attribute': 'ssh_pubkey', 
1951                    'value': '%s/%s/config/%s' % \
1952                            (self.repo_url, expid, gw_pubkey_base)
1953                },
1954                {
1955                    'attribute': 'ssh_secretkey', 
1956                    'value': '%s/%s/config/%s' % \
1957                            (self.repo_url, expid, gw_secretkey_base)
1958                },
1959                {
1960                    'attribute': 'hosts', 
1961                    'value': '%s/%s/config/hosts' % \
1962                            (self.repo_url, expid)
1963                },
1964                {
1965                    'attribute': 'seer_ca_pem', 
1966                    'value': '%s/%s/config/%s' % \
1967                            (self.repo_url, expid, 'ca.pem')
1968                },
1969                {
1970                    'attribute': 'seer_node_pem', 
1971                    'value': '%s/%s/config/%s' % \
1972                            (self.repo_url, expid, 'node.pem')
1973                },
1974            ]
1975        return attrs
1976
1977
1978    def get_vtopo(self, req, fid):
1979        """
1980        Return the stored virtual topology for this experiment
1981        """
1982        rv = None
1983        state = None
1984        self.log.info("vtopo call started for %s" %  fid)
1985
1986        req = req.get('VtopoRequestBody', None)
1987        if not req:
1988            raise service_error(service_error.req,
1989                    "Bad request format (no VtopoRequestBody)")
1990        exp = req.get('experiment', None)
1991        if exp:
1992            if exp.has_key('fedid'):
1993                key = exp['fedid']
1994                keytype = "fedid"
1995            elif exp.has_key('localname'):
1996                key = exp['localname']
1997                keytype = "localname"
1998            else:
1999                raise service_error(service_error.req, "Unknown lookup type")
2000        else:
2001            raise service_error(service_error.req, "No request?")
2002
2003        try:
2004            proof = self.check_experiment_access(fid, key)
2005        except service_error, e:
2006            self.log.info("vtopo call failed for %s: access denied" %  fid)
2007            raise e
2008
2009        self.state_lock.acquire()
2010        # XXX: this needs to be recalculated
2011        if key in self.state:
2012            if self.state[key].top is not None:
2013                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2014                rv = { 'experiment' : {keytype: key },
2015                        'vtopo': vtopo,
2016                        'proof': proof.to_dict(), 
2017                    }
2018            else:
2019                state = self.state[key].status
2020        self.state_lock.release()
2021
2022        if rv: 
2023            self.log.info("vtopo call completed for %s %s " % \
2024                (key, fid))
2025            return rv
2026        else: 
2027            if state:
2028                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2029                    (key, fid))
2030                raise service_error(service_error.partial, 
2031                        "Not ready: %s" % state)
2032            else:
2033                self.log.info("vtopo call completed for %s %s (No experiment)"\
2034                        % (key, fid))
2035                raise service_error(service_error.req, "No such experiment")
2036
2037    def get_vis(self, req, fid):
2038        """
2039        Return the stored visualization for this experiment
2040        """
2041        rv = None
2042        state = None
2043
2044        self.log.info("vis call started for %s" %  fid)
2045        req = req.get('VisRequestBody', None)
2046        if not req:
2047            raise service_error(service_error.req,
2048                    "Bad request format (no VisRequestBody)")
2049        exp = req.get('experiment', None)
2050        if exp:
2051            if exp.has_key('fedid'):
2052                key = exp['fedid']
2053                keytype = "fedid"
2054            elif exp.has_key('localname'):
2055                key = exp['localname']
2056                keytype = "localname"
2057            else:
2058                raise service_error(service_error.req, "Unknown lookup type")
2059        else:
2060            raise service_error(service_error.req, "No request?")
2061
2062        try:
2063            proof = self.check_experiment_access(fid, key)
2064        except service_error, e:
2065            self.log.info("vis call failed for %s: access denied" %  fid)
2066            raise e
2067
2068        self.state_lock.acquire()
2069        # Generate the visualization
2070        if key in self.state:
2071            if self.state[key].top is not None:
2072                try:
2073                    vis = self.genviz(
2074                            topdl.topology_to_vtopo(self.state[key].top))
2075                except service_error, e:
2076                    self.state_lock.release()
2077                    raise e
2078                rv =  { 'experiment' : {keytype: key },
2079                        'vis': vis,
2080                        'proof': proof.to_dict(), 
2081                        }
2082            else:
2083                state = self.state[key].status
2084        self.state_lock.release()
2085
2086        if rv: 
2087            self.log.info("vis call completed for %s %s " % \
2088                (key, fid))
2089            return rv
2090        else:
2091            if state:
2092                self.log.info("vis call completed for %s %s (not ready)" % \
2093                    (key, fid))
2094                raise service_error(service_error.partial, 
2095                        "Not ready: %s" % state)
2096            else:
2097                self.log.info("vis call completed for %s %s (no experiment)" % \
2098                    (key, fid))
2099                raise service_error(service_error.req, "No such experiment")
2100
2101   
2102    def save_federant_information(self, allocated, tbparams, eid, top):
2103        """
2104        Store the various data that have changed in the experiment state
2105        between when it was started and the beginning of resource allocation.
2106        This is basically the information about each local allocation.  This
2107        fills in the values of the placeholder allocation in the state.  It
2108        also collects the access proofs and returns them as dicts for a
2109        response message.
2110        """
2111        self.state_lock.acquire()
2112        exp = self.state[eid]
2113        exp.top = top.clone()
2114        # save federant information
2115        for k in allocated.keys():
2116            exp.add_allocation(tbparams[k])
2117            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2118                type="testbed", localname=[k], 
2119                service=[ s.to_topdl() for s in tbparams[k].services]))
2120
2121        # Access proofs for the response message
2122        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2123                    for p in tbparams[k].proof]
2124        exp.updated()
2125        if self.state_filename: 
2126            self.write_state()
2127        self.state_lock.release()
2128        return proofs
2129
2130    def clear_placeholder(self, eid, expid, tmpdir):
2131        """
2132        Clear the placeholder and remove any allocated temporary dir.
2133        """
2134
2135        self.state_lock.acquire()
2136        del self.state[eid]
2137        del self.state[expid]
2138        if self.state_filename: self.write_state()
2139        self.state_lock.release()
2140        if tmpdir and self.cleanup:
2141            self.remove_dirs(tmpdir)
2142
2143    # end of create_experiment sub-functions
2144
2145    def create_experiment(self, req, fid):
2146        """
2147        The external interface to experiment creation called from the
2148        dispatcher.
2149
2150        Creates a working directory, splits the incoming description using the
2151        splitter script and parses out the various subsections using the
2152        classes above.  Once each sub-experiment is created, use pooled threads
2153        to instantiate them and start it all up.
2154        """
2155
2156        self.log.info("Create experiment call started for %s" % fid)
2157        req = req.get('CreateRequestBody', None)
2158        if req:
2159            key = self.get_experiment_key(req)
2160        else:
2161            raise service_error(service_error.req,
2162                    "Bad request format (no CreateRequestBody)")
2163
2164        # Import information from the requester
2165        # import may partially succeed so always save credentials and warn
2166        if not self.auth.import_credentials(data_list=req.get('credential',[])):
2167            self.log.debug("Failed to import delegation credentials(!)")
2168        self.get_grouper_updates(fid)
2169        self.auth.update()
2170        self.auth.save()
2171
2172        try:
2173            # Make sure that the caller can talk to us
2174            proof = self.check_experiment_access(fid, key)
2175        except service_error, e:
2176            self.log.info("Create experiment call failed for %s: access denied"\
2177                    % fid)
2178            raise e
2179
2180
2181        # Install the testbed map entries supplied with the request into a copy
2182        # of the testbed map.
2183        tbmap = dict(self.tbmap)
2184        for m in req.get('testbedmap', []):
2185            if 'testbed' in m and 'uri' in m:
2186                tbmap[m['testbed']] = m['uri']
2187
2188        # a place to work
2189        try:
2190            tmpdir = tempfile.mkdtemp(prefix="split-")
2191            os.mkdir(tmpdir+"/keys")
2192        except EnvironmentError:
2193            raise service_error(service_error.internal, "Cannot create tmp dir")
2194
2195        tbparams = { }
2196
2197        eid, expid, expcert_file = \
2198                self.get_experiment_ids_and_start(key, tmpdir)
2199
2200        # This catches exceptions to clear the placeholder if necessary
2201        try: 
2202            if not (eid and expid):
2203                raise service_error(service_error.internal, 
2204                        "Cannot find local experiment info!?")
2205
2206            top = self.get_topology(req, tmpdir)
2207            self.confirm_software(top)
2208            # Assign the IPs
2209            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2210            # Find the testbeds to look up
2211            tb_hosts = { }
2212            testbeds = [ ]
2213            for e in top.elements:
2214                if isinstance(e, topdl.Computer):
2215                    tb = e.get_attribute('testbed') or 'default'
2216                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2217                    else: 
2218                        tb_hosts[tb] = [ e.name ]
2219                        testbeds.append(tb)
2220
2221            masters, pmasters = self.get_testbed_services(req, testbeds)
2222            allocated = { }         # Testbeds we can access
2223            topo ={ }               # Sub topologies
2224            connInfo = { }          # Connection information
2225
2226            self.get_access_to_testbeds(testbeds, fid, allocated, 
2227                    tbparams, masters, tbmap, expid, expcert_file)
2228
2229            # tbactive is the set of testbeds that have NATs in front of their
2230            # portals. They need to initiate connections.
2231            tbactive = set([k for k, v in tbparams.items() \
2232                    if v.get_attribute('nat_portals')])
2233
2234            self.split_topology(top, topo, testbeds)
2235
2236            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2237            # XXX: PNNL debug
2238            self.log.debug("Back from generate keys")
2239
2240            part = experiment_partition(self.auth, self.store_url, tbmap,
2241                    self.muxmax, self.direct_transit, tbactive)
2242            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2243                    connInfo, expid)
2244            # XXX: PNNL debug
2245            self.log.debug("Back from add portals")
2246
2247            auth_attrs = set()
2248            # Now get access to the dynamic testbeds (those added above)
2249            for tb in [ t for t in topo if t not in allocated]:
2250                # XXX: PNNL debug
2251                self.log.debug("dynamic testbeds %s" %tb)
2252                self.get_access(tb, tbparams, fid, masters, tbmap, 
2253                        expid, expcert_file)
2254                allocated[tb] = 1
2255                store_keys = topo[tb].get_attribute('store_keys')
2256                # Give the testbed access to keys it exports or imports
2257                if store_keys:
2258                    auth_attrs.update(set([
2259                        (tbparams[tb].allocID, sk) \
2260                                for sk in store_keys.split(" ")]))
2261
2262            # XXX: PNNL debug
2263            self.log.debug("done with dynamic testbeds %s" % auth_attrs)
2264            if auth_attrs:
2265                self.append_experiment_authorization(expid, auth_attrs)
2266
2267            # transit and disconnected testbeds may not have a connInfo entry.
2268            # Fill in the blanks.
2269            for t in allocated.keys():
2270                if not connInfo.has_key(t):
2271                    connInfo[t] = { }
2272
2273            self.wrangle_software(expid, top, topo, tbparams)
2274
2275            proofs = self.save_federant_information(allocated, tbparams, 
2276                    eid, top)
2277        except service_error, e:
2278            # If something goes wrong in the parse (usually an access error)
2279            # clear the placeholder state.  From here on out the code delays
2280            # exceptions.  Failing at this point returns a fault to the remote
2281            # caller.
2282
2283            self.log.info("Create experiment call failed for %s %s: %s" % 
2284                    (eid, fid, e))
2285            self.clear_placeholder(eid, expid, tmpdir)
2286            raise e
2287
2288        # Start the background swapper and return the starting state.  From
2289        # here on out, the state will stick around a while.
2290
2291        # Create a logger that logs to the experiment's state object as well as
2292        # to the main log file.
2293        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2294        alloc_collector = self.list_log(self.state[eid].log)
2295        h = logging.StreamHandler(alloc_collector)
2296        # XXX: there should be a global one of these rather than repeating the
2297        # code.
2298        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2299                    '%d %b %y %H:%M:%S'))
2300        alloc_log.addHandler(h)
2301
2302        # Start a thread to do the resource allocation
2303        t  = Thread(target=self.allocate_resources,
2304                args=(allocated, masters, eid, expid, tbparams, 
2305                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2306                    connInfo, tbmap, expcert_file),
2307                name=eid)
2308        t.start()
2309
2310        rv = {
2311                'experimentID': [
2312                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2313                ],
2314                'experimentStatus': 'starting',
2315                'proof': [ proof.to_dict() ] + proofs,
2316            }
2317        self.log.info("Create experiment call succeeded for %s %s" % \
2318                (eid, fid))
2319
2320        return rv
2321   
2322    def get_experiment_fedid(self, key):
2323        """
2324        find the fedid associated with the localname key in the state database.
2325        """
2326
2327        rv = None
2328        self.state_lock.acquire()
2329        if key in self.state:
2330            rv = self.state[key].fedid
2331        self.state_lock.release()
2332        return rv
2333
2334    def check_experiment_access(self, fid, key):
2335        """
2336        Confirm that the fid has access to the experiment.  Though a request
2337        may be made in terms of a local name, the access attribute is always
2338        the experiment's fedid.
2339        """
2340        if not isinstance(key, fedid):
2341            key = self.get_experiment_fedid(key)
2342
2343        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2344
2345        if access_ok:
2346            return proof
2347        else:
2348            raise service_error(service_error.access, "Access Denied",
2349                proof)
2350
2351
2352    def get_handler(self, path, fid):
2353        """
2354        Perhaps surprisingly named, this function handles HTTP GET requests to
2355        this server (SOAP requests are POSTs).
2356        """
2357        self.log.info("Get handler %s %s" % (path, fid))
2358        if len("%s" % fid) == 0:
2359            return (None, None)
2360        # XXX: log proofs?
2361        if self.auth.check_attribute(fid, path):
2362            return ("%s/%s" % (self.repodir, path), "application/binary")
2363        else:
2364            return (None, None)
2365
2366    def update_info(self, key, force=False):
2367        top = None
2368        self.state_lock.acquire()
2369        if key in self.state:
2370            if force or self.state[key].older_than(self.info_cache_limit):
2371                top = self.state[key].top
2372                if top is not None: top = top.clone()
2373                d1, info_params, cert, d2 = \
2374                        self.get_segment_info(self.state[key], need_lock=False)
2375        self.state_lock.release()
2376
2377        if top is None: return
2378
2379        try:
2380            tmpdir = tempfile.mkdtemp(prefix="info-")
2381        except EnvironmentError:
2382            raise service_error(service_error.internal, 
2383                    "Cannot create tmp dir")
2384        cert_file = self.make_temp_certfile(cert, tmpdir)
2385
2386        data = []
2387        try:
2388            for k, (uri, aid) in info_params.items():
2389                info=self.info_segment(log=self.log, testbed=uri,
2390                            cert_file=cert_file, cert_pwd=None,
2391                            trusted_certs=self.trusted_certs,
2392                            caller=self.call_InfoSegment)
2393                info(uri, aid)
2394                data.append(info)
2395        # Clean up the tmpdir no matter what
2396        finally:
2397            if tmpdir: self.remove_dirs(tmpdir)
2398
2399        self.annotate_topology(top, data)
2400        self.state_lock.acquire()
2401        if key in self.state:
2402            self.state[key].top = top
2403            self.state[key].updated()
2404            if self.state_filename: self.write_state()
2405        self.state_lock.release()
2406
2407   
2408    def get_info(self, req, fid):
2409        """
2410        Return all the stored info about this experiment
2411        """
2412        rv = None
2413
2414        self.log.info("Info call started for %s" %  fid)
2415        req = req.get('InfoRequestBody', None)
2416        if not req:
2417            raise service_error(service_error.req,
2418                    "Bad request format (no InfoRequestBody)")
2419        exp = req.get('experiment', None)
2420        legacy = req.get('legacy', False)
2421        fresh = req.get('fresh', False)
2422        if exp:
2423            if exp.has_key('fedid'):
2424                key = exp['fedid']
2425                keytype = "fedid"
2426            elif exp.has_key('localname'):
2427                key = exp['localname']
2428                keytype = "localname"
2429            else:
2430                raise service_error(service_error.req, "Unknown lookup type")
2431        else:
2432            raise service_error(service_error.req, "No request?")
2433
2434        try:
2435            proof = self.check_experiment_access(fid, key)
2436        except service_error, e:
2437            self.log.info("Info call failed for %s: access denied" %  fid)
2438
2439
2440        self.update_info(key, fresh)
2441
2442        self.state_lock.acquire()
2443        if self.state.has_key(key):
2444            rv = self.state[key].get_info()
2445            # Copy the topo if we need legacy annotations
2446            if legacy:
2447                top = self.state[key].top
2448                if top is not None: top = top.clone()
2449        self.state_lock.release()
2450        self.log.info("Gathered Info for %s %s" % (key, fid))
2451
2452        # If the legacy visualization and topology representations are
2453        # requested, calculate them and add them to the return.
2454        if legacy and rv is not None:
2455            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2456            if top is not None:
2457                vtopo = topdl.topology_to_vtopo(top)
2458                if vtopo is not None:
2459                    rv['vtopo'] = vtopo
2460                    try:
2461                        vis = self.genviz(vtopo)
2462                    except service_error, e:
2463                        self.log.debug('Problem generating visualization: %s' \
2464                                % e)
2465                        vis = None
2466                    if vis is not None:
2467                        rv['vis'] = vis
2468        if rv:
2469            self.log.info("Info succeded for %s %s" % (key, fid))
2470            rv['proof'] = proof.to_dict()
2471            return rv
2472        else: 
2473            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2474            raise service_error(service_error.req, "No such experiment")
2475
2476    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2477            results):
2478        """
2479        Call OperateSegment on multiple testbeds and gather the results.
2480        op_params contains the parameters needed to contact that testbed, cert
2481        is a certificate containing the fedid to use, op is the operation,
2482        testbeds is a dict mapping testbed name to targets in that testbed,
2483        params are the parameters to include a,d results is a growing list of
2484        the results of the calls.
2485        """
2486        try:
2487            tmpdir = tempfile.mkdtemp(prefix="info-")
2488        except EnvironmentError:
2489            raise service_error(service_error.internal, 
2490                    "Cannot create tmp dir")
2491        cert_file = self.make_temp_certfile(cert, tmpdir)
2492
2493        try:
2494            for tb, targets in testbeds.items():
2495                if tb in op_params:
2496                    uri, aid = op_params[tb]
2497                    operate=self.operation_segment(log=self.log, testbed=uri,
2498                                cert_file=cert_file, cert_pwd=None,
2499                                trusted_certs=self.trusted_certs,
2500                                caller=self.call_OperationSegment)
2501                    if operate(uri, aid, op, targets, params):
2502                        if operate.status is not None:
2503                            results.extend(operate.status)
2504                            continue
2505                # Something went wrong in a weird way.  Add statuses
2506                # that reflect that to results
2507                for t in targets:
2508                    results.append(operation_status(t, 
2509                        operation_status.federant,
2510                        'Unexpected error on %s' % tb))
2511        # Clean up the tmpdir no matter what
2512        finally:
2513            if tmpdir: self.remove_dirs(tmpdir)
2514
2515    def do_operation(self, req, fid):
2516        """
2517        Find the testbeds holding each target and ask them to carry out the
2518        operation.  Return the statuses.
2519        """
2520        # Map an element to the testbed containing it
2521        def element_to_tb(e):
2522            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2523            elif isinstance(e, topdl.Testbed): return e.name
2524            else: return None
2525        # If d is an operation_status object, make it a dict
2526        def make_dict(d):
2527            if isinstance(d, dict): return d
2528            elif isinstance(d, operation_status): return d.to_dict()
2529            else: return { }
2530
2531        def element_name(e):
2532            if isinstance(e, topdl.Computer): return e.name
2533            elif isinstance(e, topdl.Testbed): 
2534                if e.localname: return e.localname[0]
2535                else: return None
2536            else: return None
2537
2538        self.log.info("Operation call started for %s" %  fid)
2539        req = req.get('OperationRequestBody', None)
2540        if not req:
2541            raise service_error(service_error.req,
2542                    "Bad request format (no OperationRequestBody)")
2543        exp = req.get('experiment', None)
2544        op = req.get('operation', None)
2545        targets = set(req.get('target', []))
2546        params = req.get('parameter', None)
2547
2548        if exp:
2549            if 'fedid' in exp:
2550                key = exp['fedid']
2551                keytype = "fedid"
2552            elif 'localname' in exp:
2553                key = exp['localname']
2554                keytype = "localname"
2555            else:
2556                raise service_error(service_error.req, "Unknown lookup type")
2557        else:
2558            raise service_error(service_error.req, "No request?")
2559
2560        if op is None or not targets:
2561            raise service_error(service_error.req, "No request?")
2562
2563        try:
2564            proof = self.check_experiment_access(fid, key)
2565        except service_error, e:
2566            self.log.info("Operation call failed for %s: access denied" %  fid)
2567            raise e
2568
2569        self.state_lock.acquire()
2570        if key in self.state:
2571            d1, op_params, cert, d2 = \
2572                    self.get_segment_info(self.state[key], need_lock=False,
2573                            key='tb')
2574            top = self.state[key].top
2575            if top is not None:
2576                top = top.clone()
2577        self.state_lock.release()
2578
2579        if top is None:
2580            self.log.info("Operation call failed for %s: not active" %  fid)
2581            raise service_error(service_error.partial, "No topology yet", 
2582                    proof=proof)
2583
2584        testbeds = { }
2585        results = []
2586        for e in top.elements:
2587            ename = element_name(e)
2588            if ename in targets:
2589                tb = element_to_tb(e)
2590                targets.remove(ename)
2591                if tb is not None:
2592                    if tb in testbeds: testbeds[tb].append(ename)
2593                    else: testbeds[tb] = [ ename ]
2594                else:
2595                    results.append(operation_status(e.name, 
2596                        code=operation_status.no_target, 
2597                        description='Cannot map target to testbed'))
2598
2599        for t in targets:
2600            results.append(operation_status(t, operation_status.no_target))
2601
2602        self.operate_on_segments(op_params, cert, op, testbeds, params,
2603                results)
2604
2605        self.log.info("Operation call succeeded for %s" %  fid)
2606        return { 
2607                'experiment': exp, 
2608                'status': [make_dict(r) for r in results],
2609                'proof': proof.to_dict()
2610                }
2611
2612
2613    def get_multi_info(self, req, fid):
2614        """
2615        Return all the stored info that this fedid can access
2616        """
2617        rv = { 'info': [ ], 'proof': [ ] }
2618
2619        self.log.info("Multi Info call started for %s" %  fid)
2620        self.get_grouper_updates(fid)
2621        self.auth.update()
2622        self.auth.save()
2623        self.state_lock.acquire()
2624        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2625            try:
2626                proof = self.check_experiment_access(fid, key)
2627            except service_error, e:
2628                if e.code == service_error.access:
2629                    continue
2630                else:
2631                    self.log.info("Multi Info call failed for %s: %s" %  \
2632                            (e,fid))
2633                    self.state_lock.release()
2634                    raise e
2635
2636            if self.state.has_key(key):
2637                e = self.state[key].get_info()
2638                e['proof'] = proof.to_dict()
2639                rv['info'].append(e)
2640                rv['proof'].append(proof.to_dict())
2641        self.state_lock.release()
2642        self.log.info("Multi Info call succeeded for %s" %  fid)
2643        return rv
2644
2645    def check_termination_status(self, fed_exp, force):
2646        """
2647        Confirm that the experiment is sin a valid state to stop (or force it)
2648        return the state - invalid states for deletion and force settings cause
2649        exceptions.
2650        """
2651        self.state_lock.acquire()
2652        status = fed_exp.status
2653
2654        if status:
2655            if status in ('starting', 'terminating'):
2656                if not force:
2657                    self.state_lock.release()
2658                    raise service_error(service_error.partial, 
2659                            'Experiment still being created or destroyed')
2660                else:
2661                    self.log.warning('Experiment in %s state ' % status + \
2662                            'being terminated by force.')
2663            self.state_lock.release()
2664            return status
2665        else:
2666            # No status??? trouble
2667            self.state_lock.release()
2668            raise service_error(service_error.internal,
2669                    "Experiment has no status!?")
2670
2671    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2672        ids = []
2673        term_params = { }
2674        if need_lock: self.state_lock.acquire()
2675        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2676        expcert = fed_exp.identity
2677        repo = "%s" % fed_exp.fedid
2678
2679        # Collect the allocation/segment ids into a dict keyed by the fedid
2680        # of the allocation that contains a tuple of uri, aid
2681        for i, fed in enumerate(fed_exp.get_all_allocations()):
2682            uri = fed.uri
2683            aid = fed.allocID
2684            if key == 'aid': term_params[aid] = (uri, aid)
2685            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2686
2687        if need_lock: self.state_lock.release()
2688        return ids, term_params, expcert, repo
2689
2690
2691    def get_termination_info(self, fed_exp):
2692        self.state_lock.acquire()
2693        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2694        # Change the experiment state
2695        fed_exp.status = 'terminating'
2696        fed_exp.updated()
2697        if self.state_filename: self.write_state()
2698        self.state_lock.release()
2699
2700        return ids, term_params, expcert, repo
2701
2702
2703    def deallocate_resources(self, term_params, expcert, status, force, 
2704            dealloc_log):
2705        tmpdir = None
2706        # This try block makes sure the tempdir is cleared
2707        try:
2708            # If no expcert, try the deallocation as the experiment
2709            # controller instance.
2710            if expcert and self.auth_type != 'legacy': 
2711                try:
2712                    tmpdir = tempfile.mkdtemp(prefix="term-")
2713                except EnvironmentError:
2714                    raise service_error(service_error.internal, 
2715                            "Cannot create tmp dir")
2716                cert_file = self.make_temp_certfile(expcert, tmpdir)
2717                pw = None
2718            else: 
2719                cert_file = self.cert_file
2720                pw = self.cert_pwd
2721
2722            # Stop everyone.  NB, wait_for_all waits until a thread starts
2723            # and then completes, so we can't wait if nothing starts.  So,
2724            # no tbparams, no start.
2725            if len(term_params) > 0:
2726                tp = thread_pool(self.nthreads)
2727                for k, (uri, aid) in term_params.items():
2728                    # Create and start a thread to stop the segment
2729                    tp.wait_for_slot()
2730                    t  = pooled_thread(\
2731                            target=self.terminate_segment(log=dealloc_log,
2732                                testbed=uri,
2733                                cert_file=cert_file, 
2734                                cert_pwd=pw,
2735                                trusted_certs=self.trusted_certs,
2736                                caller=self.call_TerminateSegment),
2737                            args=(uri, aid), name=k,
2738                            pdata=tp, trace_file=self.trace_file)
2739                    t.start()
2740                # Wait for completions
2741                tp.wait_for_all_done()
2742
2743            # release the allocations (failed experiments have done this
2744            # already, and starting experiments may be in odd states, so we
2745            # ignore errors releasing those allocations
2746            try: 
2747                for k, (uri, aid)  in term_params.items():
2748                    self.release_access(None, aid, uri=uri,
2749                            cert_file=cert_file, cert_pwd=pw)
2750            except service_error, e:
2751                if status != 'failed' and not force:
2752                    raise e
2753
2754        # Clean up the tmpdir no matter what
2755        finally:
2756            if tmpdir: self.remove_dirs(tmpdir)
2757
2758    def terminate_experiment(self, req, fid):
2759        """
2760        Swap this experiment out on the federants and delete the shared
2761        information
2762        """
2763        self.log.info("Terminate experiment call started for %s" % fid)
2764        tbparams = { }
2765        req = req.get('TerminateRequestBody', None)
2766        if not req:
2767            raise service_error(service_error.req,
2768                    "Bad request format (no TerminateRequestBody)")
2769
2770        key = self.get_experiment_key(req, 'experiment')
2771        try:
2772            proof = self.check_experiment_access(fid, key)
2773        except service_error, e:
2774            self.log.info(
2775                    "Terminate experiment call failed for %s: access denied" \
2776                            % fid)
2777            raise e
2778        exp = req.get('experiment', False)
2779        force = req.get('force', False)
2780
2781        dealloc_list = [ ]
2782
2783
2784        # Create a logger that logs to the dealloc_list as well as to the main
2785        # log file.
2786        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2787        dealloc_log.info("Terminating %s " %key)
2788        h = logging.StreamHandler(self.list_log(dealloc_list))
2789        # XXX: there should be a global one of these rather than repeating the
2790        # code.
2791        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2792                    '%d %b %y %H:%M:%S'))
2793        dealloc_log.addHandler(h)
2794
2795        self.state_lock.acquire()
2796        fed_exp = self.state.get(key, None)
2797        self.state_lock.release()
2798        repo = None
2799
2800        if fed_exp:
2801            status = self.check_termination_status(fed_exp, force)
2802            # get_termination_info updates the experiment state
2803            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2804            self.deallocate_resources(term_params, expcert, status, force, 
2805                    dealloc_log)
2806
2807            # Remove the terminated experiment
2808            self.state_lock.acquire()
2809            for id in ids:
2810                self.clear_experiment_authorization(id, need_state_lock=False)
2811                if id in self.state: del self.state[id]
2812
2813            if self.state_filename: self.write_state()
2814            self.state_lock.release()
2815
2816            # Delete any synch points associated with this experiment.  All
2817            # synch points begin with the fedid of the experiment.
2818            fedid_keys = set(["fedid:%s" % f for f in ids \
2819                    if isinstance(f, fedid)])
2820            for k in self.synch_store.all_keys():
2821                try:
2822                    if len(k) > 45 and k[0:46] in fedid_keys:
2823                        self.synch_store.del_value(k)
2824                except synch_store.BadDeletionError:
2825                    pass
2826            self.write_store()
2827
2828            # Remove software and other cached stuff from the filesystem.
2829            if repo:
2830                self.remove_dirs("%s/%s" % (self.repodir, repo))
2831       
2832            self.log.info("Terminate experiment succeeded for %s %s" % \
2833                    (key, fid))
2834            return { 
2835                    'experiment': exp , 
2836                    'deallocationLog': string.join(dealloc_list, ''),
2837                    'proof': [proof.to_dict()],
2838                    }
2839        else:
2840            self.log.info("Terminate experiment failed for %s %s: no state" % \
2841                    (key, fid))
2842            raise service_error(service_error.req, "No saved state")
2843
2844
2845    def GetValue(self, req, fid):
2846        """
2847        Get a value from the synchronized store
2848        """
2849        req = req.get('GetValueRequestBody', None)
2850        if not req:
2851            raise service_error(service_error.req,
2852                    "Bad request format (no GetValueRequestBody)")
2853       
2854        name = req.get('name', None)
2855        wait = req.get('wait', False)
2856        rv = { 'name': name }
2857
2858        if not name:
2859            raise service_error(service_error.req, "No name?")
2860
2861        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2862
2863        if access_ok:
2864            self.log.debug("[GetValue] asking for %s " % name)
2865            try:
2866                v = self.synch_store.get_value(name, wait)
2867            except synch_store.RevokedKeyError:
2868                # No more synch on this key
2869                raise service_error(service_error.federant, 
2870                        "Synch key %s revoked" % name)
2871            if v is not None:
2872                rv['value'] = v
2873            rv['proof'] = proof.to_dict()
2874            self.log.debug("[GetValue] got %s from %s" % (v, name))
2875            return rv
2876        else:
2877            raise service_error(service_error.access, "Access Denied",
2878                    proof=proof)
2879       
2880
2881    def SetValue(self, req, fid):
2882        """
2883        Set a value in the synchronized store
2884        """
2885        req = req.get('SetValueRequestBody', None)
2886        if not req:
2887            raise service_error(service_error.req,
2888                    "Bad request format (no SetValueRequestBody)")
2889       
2890        name = req.get('name', None)
2891        v = req.get('value', '')
2892
2893        if not name:
2894            raise service_error(service_error.req, "No name?")
2895
2896        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2897
2898        if access_ok:
2899            try:
2900                self.synch_store.set_value(name, v)
2901                self.write_store()
2902                self.log.debug("[SetValue] set %s to %s" % (name, v))
2903            except synch_store.CollisionError:
2904                # Translate into a service_error
2905                raise service_error(service_error.req,
2906                        "Value already set: %s" %name)
2907            except synch_store.RevokedKeyError:
2908                # No more synch on this key
2909                raise service_error(service_error.federant, 
2910                        "Synch key %s revoked" % name)
2911                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2912        else:
2913            raise service_error(service_error.access, "Access Denied",
2914                    proof=proof)
Note: See TracBrowser for help on using the repository browser.