source: fedd/federation/experiment_control.py @ 17c6d91

Last change on this file since 17c6d91 was 17c6d91, checked in by Ted Faber <faber@…>, 12 years ago

More principled default testbed handling

  • Property mode set to 100644
File size: 99.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15import shutil
16import zipfile
17
18import os.path
19
20import traceback
21# For parsing visualization output and splitter output
22import xml.parsers.expat
23
24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
26from string import join
27
28from urlparse import urlparse
29from urllib2 import urlopen, URLError
30
31from util import *
32from deter import fedid, generate_fedid
33from remote_service import xmlrpc_handler, soap_handler, service_caller
34from service_error import service_error
35from synch_store import synch_store
36from experiment_partition import experiment_partition
37from experiment_control_legacy import experiment_control_legacy
38from authorizer import abac_authorizer
39from thread_pool import thread_pool, pooled_thread
40from experiment_info import experiment_info, allocation_info, federated_service
41from operation_status import operation_status
42
43from deter import topdl
44from deter import ip_allocator
45from deter import ip_addr
46from deter import topology_to_route_file
47import list_log
48
49
50class nullHandler(logging.Handler):
51    def emit(self, record): pass
52
53fl = logging.getLogger("fedd.experiment_control")
54fl.addHandler(nullHandler())
55
56class experiment_control_local(experiment_control_legacy):
57    """
58    Control of experiments that this system can directly access.
59
60    Includes experiment creation, termination and information dissemination.
61    Thred safe.
62    """
63
64    class ssh_cmd_timeout(RuntimeError): pass
65   
66    call_RequestAccess = service_caller('RequestAccess')
67    call_ReleaseAccess = service_caller('ReleaseAccess')
68    call_StartSegment = service_caller('StartSegment')
69    call_TerminateSegment = service_caller('TerminateSegment')
70    call_InfoSegment = service_caller('InfoSegment')
71    call_OperationSegment = service_caller('OperationSegment')
72    call_Ns2Topdl = service_caller('Ns2Topdl')
73
74    def __init__(self, config=None, auth=None):
75        """
76        Intialize the various attributes, most from the config object
77        """
78
79        def parse_tarfile_list(tf):
80            """
81            Parse a tarfile list from the configuration.  This is a set of
82            paths and tarfiles separated by spaces.
83            """
84            rv = [ ]
85            if tf is not None:
86                tl = tf.split()
87                while len(tl) > 1:
88                    p, t = tl[0:2]
89                    del tl[0:2]
90                    rv.append((p, t))
91            return rv
92
93        self.list_log = list_log.list_log
94
95        self.cert_file = config.get("experiment_control", "cert_file")
96        if self.cert_file:
97            self.cert_pwd = config.get("experiment_control", "cert_pwd")
98        else:
99            self.cert_file = config.get("globals", "cert_file")
100            self.cert_pwd = config.get("globals", "cert_pwd")
101
102        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
103                or config.get("globals", "trusted_certs")
104
105        self.repodir = config.get("experiment_control", "repodir")
106        self.repo_url = config.get("experiment_control", "repo_url", 
107                "https://users.isi.deterlab.net:23235");
108
109        self.exp_stem = "fed-stem"
110        self.log = logging.getLogger("fedd.experiment_control")
111        set_log_level(config, "experiment_control", self.log)
112        self.muxmax = 2
113        self.nthreads = 10
114        self.randomize_experiments = False
115
116        self.splitter = None
117        self.ssh_keygen = "/usr/bin/ssh-keygen"
118        self.ssh_identity_file = None
119
120
121        self.debug = config.getboolean("experiment_control", "create_debug")
122        self.cleanup = not config.getboolean("experiment_control", 
123                "leave_tmpfiles")
124        self.state_filename = config.get("experiment_control", 
125                "experiment_state")
126        self.store_filename = config.get("experiment_control", 
127                "synch_store")
128        self.store_url = config.get("experiment_control", "store_url")
129        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
130        self.grouper_url = config.get("experiment_control", "grouper_url")
131        self.fedkit = parse_tarfile_list(\
132                config.get("experiment_control", "fedkit"))
133        self.gatewaykit = parse_tarfile_list(\
134                config.get("experiment_control", "gatewaykit"))
135
136        dt = config.get("experiment_control", "direct_transit")
137        self.auth_type = config.get('experiment_control', 'auth_type') \
138                or 'legacy'
139        self.auth_dir = config.get('experiment_control', 'auth_dir')
140        self.routing = config.get('experiment_control', 'routing')
141        self.default_tb = config.get('experiment_control', 'default_testbed')
142        # XXX: document this!
143        self.info_cache_limit = \
144                config.getint('experiment_control', 'info_cache', 600)
145        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
146        else: self.direct_transit = [ ]
147        # NB for internal master/slave ops, not experiment setup
148        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
149
150        self.overrides = set([])
151        ovr = config.get('experiment_control', 'overrides')
152        if ovr:
153            for o in ovr.split(","):
154                o = o.strip()
155                if o.startswith('fedid:'): o = o[len('fedid:'):]
156                self.overrides.add(fedid(hexstr=o))
157
158        self.state = { }
159        self.state_lock = Lock()
160        self.tclsh = "/usr/local/bin/otclsh"
161        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
162                config.get("experiment_control", "tcl_splitter",
163                        "/usr/testbed/lib/ns2ir/parse.tcl")
164        mapdb_file = config.get("experiment_control", "mapdb")
165        self.trace_file = sys.stderr
166
167        self.def_expstart = \
168                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
169                "/tmp/federate";
170        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
171                "FEDDIR/hosts";
172        self.def_gwstart = \
173                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
174                "/tmp/bridge.log";
175        self.def_mgwstart = \
176                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
177                "/tmp/bridge.log";
178        self.def_gwimage = "FBSD61-TUNNEL2";
179        self.def_gwtype = "pc";
180        self.local_access = { }
181
182        if self.auth_type == 'legacy':
183            if auth:
184                self.auth = auth
185            else:
186                self.log.error( "[access]: No authorizer initialized, " +\
187                        "creating local one.")
188                auth = authorizer()
189            self.get_access = self.legacy_get_access
190        elif self.auth_type == 'abac':
191            self.auth = abac_authorizer(load=self.auth_dir, 
192                    update=os.path.join(self.auth_dir,'update'))
193        else:
194            raise service_error(service_error.internal, 
195                    "Unknown auth_type: %s" % self.auth_type)
196
197        if mapdb_file:
198            self.read_mapdb(mapdb_file)
199        else:
200            self.log.warn("[experiment_control] No testbed map, using defaults")
201            self.tbmap = { 
202                    'deter':'https://users.isi.deterlab.net:23235',
203                    'emulab':'https://users.isi.deterlab.net:23236',
204                    'ucb':'https://users.isi.deterlab.net:23237',
205                    }
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
219        # Dispatch tables
220        self.soap_services = {\
221                'New': soap_handler('New', self.new_experiment),
222                'Create': soap_handler('Create', self.create_experiment),
223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
227                'Operation': soap_handler('Operation', self.do_operation),
228                'Terminate': soap_handler('Terminate', 
229                    self.terminate_experiment),
230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
232        }
233
234        self.xmlrpc_services = {\
235                'New': xmlrpc_handler('New', self.new_experiment),
236                'Create': xmlrpc_handler('Create', self.create_experiment),
237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
241                'Terminate': xmlrpc_handler('Terminate',
242                    self.terminate_experiment),
243                'Operation': xmlrpc_handler('Operation', self.do_operation),
244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
246        }
247
248    # Call while holding self.state_lock
249    def write_state(self):
250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
262        except EnvironmentError, e:
263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
269
270    @staticmethod
271    def get_alloc_ids(exp):
272        """
273        Used by read_store and read state.  This used to be worse.
274        """
275
276        return [ a.allocID for a in exp.get_all_allocations() ]
277
278
279    # Call while holding self.state_lock
280    def read_state(self):
281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
286       
287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
292        except EnvironmentError, e:
293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
299        for s in self.state.values():
300            try:
301
302                eid = s.fedid
303                if eid : 
304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
307                        #self.auth.set_attribute(s['owner'], eid)
308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
317                else:
318                    raise KeyError("No experiment id")
319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
322
323    def read_mapdb(self, file):
324        """
325        Read a simple colon separated list of mappings for the
326        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
327        also adds testbeds to active if they include , active after
328        their name.
329        """
330
331        self.tbmap = { }
332        lineno =0
333        try:
334            f = open(file, "r")
335            for line in f:
336                lineno += 1
337                line = line.strip()
338                if line.startswith('#') or len(line) == 0:
339                    continue
340                try:
341                    label, url = line.split(':', 1)
342                    self.tbmap[label] = url
343                except ValueError, e:
344                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
345                            "map db: %s %s" % (lineno, line, e))
346        except EnvironmentError, e:
347            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
348                    "open %s: %s" % (file, e))
349        else:
350            f.close()
351
352    def read_store(self):
353        try:
354            self.synch_store = synch_store()
355            self.synch_store.load(self.store_filename)
356            self.log.debug("[read_store]: Read store from %s" % \
357                    self.store_filename)
358        except EnvironmentError, e:
359            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
360                    % (self.state_filename, e))
361            self.synch_store = synch_store()
362
363        # Set the initial permissions on data in the store.  XXX: This ad hoc
364        # authorization attribute initialization is getting out of hand.
365        # XXX: legacy
366        if self.auth_type == 'legacy':
367            for k in self.synch_store.all_keys():
368                try:
369                    if k.startswith('fedid:'):
370                        fid = fedid(hexstr=k[6:46])
371                        if self.state.has_key(fid):
372                            for a in self.get_alloc_ids(self.state[fid]):
373                                self.auth.set_attribute(a, k)
374                except ValueError, e:
375                    self.log.warn("Cannot deduce permissions for %s" % k)
376
377
378    def write_store(self):
379        """
380        Write a new copy of synch_store after writing current state
381        to a backup.  We use the internal synch_store pickle method to avoid
382        incinsistent data.
383
384        State format is a simple pickling of the store.
385        """
386        if os.access(self.store_filename, os.W_OK):
387            copy_file(self.store_filename, \
388                    "%s.bak" % self.store_filename)
389        try:
390            self.synch_store.save(self.store_filename)
391        except EnvironmentError, e:
392            self.log.error("Can't write file %s: %s" % \
393                    (self.store_filename, e))
394        except TypeError, e:
395            self.log.error("Pickling problem (TypeError): %s" % e)
396
397    # XXX this may belong somewhere else
398
399    def get_grouper_updates(self, fid):
400        if self.grouper_url is None: return
401        d = tempfile.mkdtemp()
402        try:
403            fstr = "%s" % fid
404            # XXX locking
405            zipname = os.path.join(d, 'grouper.zip')
406            dest = os.path.join(self.auth_dir, 'update')
407            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
408            f = open(zipname, 'w')
409            f.write(resp.read())
410            f.close()
411            zf = zipfile.ZipFile(zipname, 'r')
412            zf.extractall(dest)
413            zf.close()
414        except URLError, e:
415            self.log.error("Cannot connect to grouper: %s" % e)
416            pass
417        finally:
418            shutil.rmtree(d)
419
420
421    def remove_dirs(self, dir):
422        """
423        Remove the directory tree and all files rooted at dir.  Log any errors,
424        but continue.
425        """
426        self.log.debug("[removedirs]: removing %s" % dir)
427        try:
428            for path, dirs, files in os.walk(dir, topdown=False):
429                for f in files:
430                    os.remove(os.path.join(path, f))
431                for d in dirs:
432                    os.rmdir(os.path.join(path, d))
433            os.rmdir(dir)
434        except EnvironmentError, e:
435            self.log.error("Error deleting directory tree in %s" % e);
436
437    @staticmethod
438    def make_temp_certfile(expcert, tmpdir):
439        """
440        make a protected copy of the access certificate so the experiment
441        controller can act as the experiment principal.  mkstemp is the most
442        secure way to do that. The directory should be created by
443        mkdtemp.  Return the filename.
444        """
445        if expcert and tmpdir:
446            try:
447                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
448                f = os.fdopen(certf, 'w')
449                print >> f, expcert
450                f.close()
451            except EnvironmentError, e:
452                raise service_error(service_error.internal, 
453                        "Cannot create temp cert file?")
454            return certfn
455        else:
456            return None
457
458       
459    def generate_ssh_keys(self, dest, type="rsa" ):
460        """
461        Generate a set of keys for the gateways to use to talk.
462
463        Keys are of type type and are stored in the required dest file.
464        """
465        valid_types = ("rsa", "dsa")
466        t = type.lower();
467        if t not in valid_types: raise ValueError
468        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
469
470        try:
471            trace = open("/dev/null", "w")
472        except EnvironmentError:
473            raise service_error(service_error.internal,
474                    "Cannot open /dev/null??");
475
476        # May raise CalledProcessError
477        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
478        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
479        if rv != 0:
480            raise service_error(service_error.internal, 
481                    "Cannot generate nonce ssh keys.  %s return code %d" \
482                            % (self.ssh_keygen, rv))
483
484    def generate_seer_certs(self, destdir):
485        '''
486        Create a SEER ca cert and a node cert in destdir/ca.pem and
487        destdir/node.pem respectively.  These will be distributed throughout
488        the federated experiment.  This routine reports errors via
489        service_errors.
490        '''
491        openssl = '/usr/bin/openssl'
492        # All the filenames and parameters we need for openssl calls below
493        ca_key =os.path.join(destdir, 'ca.key') 
494        ca_pem = os.path.join(destdir, 'ca.pem')
495        node_key =os.path.join(destdir, 'node.key') 
496        node_pem = os.path.join(destdir, 'node.pem')
497        node_req = os.path.join(destdir, 'node.req')
498        node_signed = os.path.join(destdir, 'node.signed')
499        days = '%s' % (365 * 10)
500        serial = '%s' % random.randint(0, 1<<16)
501
502        try:
503            # Sequence of calls to create a CA key, create a ca cert, create a
504            # node key, node signing request, and finally a signed node
505            # certificate.
506            sequence = (
507                    (openssl, 'genrsa', '-out', ca_key, '1024'),
508                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
509                        ca_pem, '-days', days, '-subj', 
510                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
511                    (openssl, 'genrsa', '-out', node_key, '1024'),
512                    (openssl, 'req', '-new', '-key', node_key, '-out', 
513                        node_req, '-days', days, '-subj', 
514                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
515                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
516                        '-set_serial', serial, '-req', '-in', node_req, 
517                        '-out', node_signed, '-days', days),
518                )
519            # Do all that stuff; bail if there's an error, and push all the
520            # output to dev/null.
521            for cmd in sequence:
522                trace = open("/dev/null", "w")
523                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
524                if rv != 0:
525                    raise service_error(service_error.internal, 
526                            "Cannot generate SEER certs.  %s return code %d" \
527                                    % (' '.join(cmd), rv))
528            # Concatinate the node key and signed certificate into node.pem
529            f = open(node_pem, 'w')
530            for comp in (node_signed, node_key):
531                g = open(comp, 'r')
532                f.write(g.read())
533                g.close()
534            f.close()
535
536            # Throw out intermediaries.
537            for fn in (ca_key, node_key, node_req, node_signed):
538                os.unlink(fn)
539
540        except EnvironmentError, e:
541            # Any difficulties with the file system wind up here
542            raise service_error(service_error.internal,
543                    "File error on  %s while creating SEER certs: %s" % \
544                            (e.filename, e.strerror))
545
546
547
548    def gentopo(self, str):
549        """
550        Generate the topology data structure from the splitter's XML
551        representation of it.
552
553        The topology XML looks like:
554            <experiment>
555                <nodes>
556                    <node><vname></vname><ips>ip1:ip2</ips></node>
557                </nodes>
558                <lans>
559                    <lan>
560                        <vname></vname><vnode></vnode><ip></ip>
561                        <bandwidth></bandwidth><member>node:port</member>
562                    </lan>
563                </lans>
564        """
565        class topo_parse:
566            """
567            Parse the topology XML and create the dats structure.
568            """
569            def __init__(self):
570                # Typing of the subelements for data conversion
571                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
572                self.int_subelements = ( 'bandwidth',)
573                self.float_subelements = ( 'delay',)
574                # The final data structure
575                self.nodes = [ ]
576                self.lans =  [ ]
577                self.topo = { \
578                        'node': self.nodes,\
579                        'lan' : self.lans,\
580                    }
581                self.element = { }  # Current element being created
582                self.chars = ""     # Last text seen
583
584            def end_element(self, name):
585                # After each sub element the contents is added to the current
586                # element or to the appropriate list.
587                if name == 'node':
588                    self.nodes.append(self.element)
589                    self.element = { }
590                elif name == 'lan':
591                    self.lans.append(self.element)
592                    self.element = { }
593                elif name in self.str_subelements:
594                    self.element[name] = self.chars
595                    self.chars = ""
596                elif name in self.int_subelements:
597                    self.element[name] = int(self.chars)
598                    self.chars = ""
599                elif name in self.float_subelements:
600                    self.element[name] = float(self.chars)
601                    self.chars = ""
602
603            def found_chars(self, data):
604                self.chars += data.rstrip()
605
606
607        tp = topo_parse();
608        parser = xml.parsers.expat.ParserCreate()
609        parser.EndElementHandler = tp.end_element
610        parser.CharacterDataHandler = tp.found_chars
611
612        parser.Parse(str)
613
614        return tp.topo
615       
616
617    def genviz(self, topo):
618        """
619        Generate the visualization the virtual topology
620        """
621
622        neato = "/usr/local/bin/neato"
623        # These are used to parse neato output and to create the visualization
624        # file.
625        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
626        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
627                "%s</type></node>"
628
629        try:
630            # Node names
631            nodes = [ n['vname'] for n in topo['node'] ]
632            topo_lans = topo['lan']
633        except KeyError, e:
634            raise service_error(service_error.internal, "Bad topology: %s" %e)
635
636        lans = { }
637        links = { }
638
639        # Walk through the virtual topology, organizing the connections into
640        # 2-node connections (links) and more-than-2-node connections (lans).
641        # When a lan is created, it's added to the list of nodes (there's a
642        # node in the visualization for the lan).
643        for l in topo_lans:
644            if links.has_key(l['vname']):
645                if len(links[l['vname']]) < 2:
646                    links[l['vname']].append(l['vnode'])
647                else:
648                    nodes.append(l['vname'])
649                    lans[l['vname']] = links[l['vname']]
650                    del links[l['vname']]
651                    lans[l['vname']].append(l['vnode'])
652            elif lans.has_key(l['vname']):
653                lans[l['vname']].append(l['vnode'])
654            else:
655                links[l['vname']] = [ l['vnode'] ]
656
657
658        # Open up a temporary file for dot to turn into a visualization
659        try:
660            df, dotname = tempfile.mkstemp()
661            dotfile = os.fdopen(df, 'w')
662        except EnvironmentError:
663            raise service_error(service_error.internal,
664                    "Failed to open file in genviz")
665
666        try:
667            dnull = open('/dev/null', 'w')
668        except EnvironmentError:
669            service_error(service_error.internal,
670                    "Failed to open /dev/null in genviz")
671
672        # Generate a dot/neato input file from the links, nodes and lans
673        try:
674            print >>dotfile, "graph G {"
675            for n in nodes:
676                print >>dotfile, '\t"%s"' % n
677            for l in links.keys():
678                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
679            for l in lans.keys():
680                for n in lans[l]:
681                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
682            print >>dotfile, "}"
683            dotfile.close()
684        except TypeError:
685            raise service_error(service_error.internal,
686                    "Single endpoint link in vtopo")
687        except EnvironmentError:
688            raise service_error(service_error.internal, "Cannot write dot file")
689
690        # Use dot to create a visualization
691        try:
692            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
693                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
694                stderr=dnull, close_fds=True)
695        except EnvironmentError:
696            raise service_error(service_error.internal, 
697                    "Cannot generate visualization: is graphviz available?")
698        dnull.close()
699
700        # Translate dot to vis format
701        vis_nodes = [ ]
702        vis = { 'node': vis_nodes }
703        for line in dot.stdout:
704            m = vis_re.match(line)
705            if m:
706                vn = m.group(1)
707                vis_node = {'name': vn, \
708                        'x': float(m.group(2)),\
709                        'y' : float(m.group(3)),\
710                    }
711                if vn in links.keys() or vn in lans.keys():
712                    vis_node['type'] = 'lan'
713                else:
714                    vis_node['type'] = 'node'
715                vis_nodes.append(vis_node)
716        rv = dot.wait()
717
718        os.remove(dotname)
719        # XXX: graphviz seems to use low return codes for warnings, like
720        # "couldn't find font"
721        if rv < 2 : return vis
722        else: return None
723
724
725    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
726            cert_pwd=None):
727        """
728        Release access to testbed through fedd
729        """
730
731        if not uri and tbmap:
732            uri = tbmap.get(tb, None)
733        if not uri:
734            raise service_error(service_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        if self.local_access.has_key(uri):
738            resp = self.local_access[uri].ReleaseAccess(\
739                    { 'ReleaseAccessRequestBody' : 
740                        {'allocID': {'fedid': aid}},}, 
741                    fedid(file=cert_file))
742            resp = { 'ReleaseAccessResponseBody': resp } 
743        else:
744            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
745                    cert_file, cert_pwd, self.trusted_certs)
746
747        # better error coding
748
749    def remote_ns2topdl(self, uri, desc):
750
751        req = {
752                'description' : { 'ns2description': desc },
753            }
754
755        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
756                self.trusted_certs)
757
758        if r.has_key('Ns2TopdlResponseBody'):
759            r = r['Ns2TopdlResponseBody']
760            ed = r.get('experimentdescription', None)
761            if ed.has_key('topdldescription'):
762                return topdl.Topology(**ed['topdldescription'])
763            else:
764                raise service_error(service_error.protocol, 
765                        "Bad splitter response (no output)")
766        else:
767            raise service_error(service_error.protocol, "Bad splitter response")
768
769    class start_segment:
770        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
771                cert_pwd=None, trusted_certs=None, caller=None,
772                log_collector=None):
773            self.log = log
774            self.debug = debug
775            self.cert_file = cert_file
776            self.cert_pwd = cert_pwd
777            self.trusted_certs = None
778            self.caller = caller
779            self.testbed = testbed
780            self.log_collector = log_collector
781            self.response = None
782            self.node = { }
783            self.subs = { }
784            self.tb = { }
785            self.proof = None
786
787        def make_map(self, resp):
788            if 'segmentdescription' not in resp  or \
789                    'topdldescription' not in resp['segmentdescription']:
790                self.log.warn('No topology returned from startsegment')
791                return 
792
793            top = topdl.Topology(
794                    **resp['segmentdescription']['topdldescription'])
795
796            for e in top.elements:
797                if isinstance(e, topdl.Computer):
798                    self.node[e.name] = e
799                elif isinstance(e, topdl.Testbed):
800                    self.tb[e.uri] = e
801            for s in top.substrates:
802                self.subs[s.name] = s
803
804        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
805            req = {
806                    'allocID': { 'fedid' : aid }, 
807                    'segmentdescription': { 
808                        'topdldescription': topo.to_dict(),
809                    },
810                }
811
812            if connInfo:
813                req['connection'] = connInfo
814
815            import_svcs = [ s for m in masters.values() \
816                    for s in m if self.testbed in s.importers]
817
818            if import_svcs or self.testbed in masters:
819                req['service'] = []
820
821            for s in import_svcs:
822                for r in s.reqs:
823                    sr = copy.deepcopy(r)
824                    sr['visibility'] = 'import';
825                    req['service'].append(sr)
826
827            for s in masters.get(self.testbed, []):
828                for r in s.reqs:
829                    sr = copy.deepcopy(r)
830                    sr['visibility'] = 'export';
831                    req['service'].append(sr)
832
833            if attrs:
834                req['fedAttr'] = attrs
835
836            try:
837                self.log.debug("Calling StartSegment at %s " % uri)
838                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
839                        self.trusted_certs)
840                if r.has_key('StartSegmentResponseBody'):
841                    lval = r['StartSegmentResponseBody'].get('allocationLog',
842                            None)
843                    if lval and self.log_collector:
844                        for line in  lval.splitlines(True):
845                            self.log_collector.write(line)
846                    self.make_map(r['StartSegmentResponseBody'])
847                    if 'proof' in r: self.proof = r['proof']
848                    self.response = r
849                else:
850                    raise service_error(service_error.internal, 
851                            "Bad response!?: %s" %r)
852                return True
853            except service_error, e:
854                self.log.error("Start segment failed on %s: %s" % \
855                        (self.testbed, e))
856                return False
857
858
859
860    class terminate_segment:
861        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
862                cert_pwd=None, trusted_certs=None, caller=None):
863            self.log = log
864            self.debug = debug
865            self.cert_file = cert_file
866            self.cert_pwd = cert_pwd
867            self.trusted_certs = None
868            self.caller = caller
869            self.testbed = testbed
870
871        def __call__(self, uri, aid ):
872            req = {
873                    'allocID': {'fedid': aid }, 
874                }
875            self.log.info("Calling terminate segment")
876            try:
877                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
878                        self.trusted_certs)
879                self.log.info("Terminate segment succeeded")
880                return True
881            except service_error, e:
882                self.log.error("Terminate segment failed on %s: %s" % \
883                        (self.testbed, e))
884                return False
885
886    class info_segment(start_segment):
887        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
888                cert_pwd=None, trusted_certs=None, caller=None,
889                log_collector=None):
890            experiment_control_local.start_segment.__init__(self, debug, 
891                    log, testbed, cert_file, cert_pwd, trusted_certs, 
892                    caller, log_collector)
893
894        def __call__(self, uri, aid):
895            req = { 'allocID': { 'fedid' : aid } }
896
897            try:
898                self.log.debug("Calling InfoSegment at %s " % uri)
899                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
900                        self.trusted_certs)
901                if r.has_key('InfoSegmentResponseBody'):
902                    self.make_map(r['InfoSegmentResponseBody'])
903                    if 'proof' in r: self.proof = r['proof']
904                    self.response = r
905                else:
906                    raise service_error(service_error.internal, 
907                            "Bad response!?: %s" %r)
908                return True
909            except service_error, e:
910                self.log.error("Info segment failed on %s: %s" % \
911                        (self.testbed, e))
912                return False
913
914    class operation_segment:
915        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
916                cert_pwd=None, trusted_certs=None, caller=None,
917                log_collector=None):
918            self.log = log
919            self.debug = debug
920            self.cert_file = cert_file
921            self.cert_pwd = cert_pwd
922            self.trusted_certs = None
923            self.caller = caller
924            self.testbed = testbed
925            self.status = None
926
927        def __call__(self, uri, aid, op, targets, params):
928            req = { 
929                    'allocID': { 'fedid' : aid },
930                    'operation': op,
931                    'target': targets,
932                    }
933            if params: req['parameter'] = params
934
935
936            try:
937                self.log.debug("Calling OperationSegment at %s " % uri)
938                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
939                        self.trusted_certs)
940                if 'OperationSegmentResponseBody' in r:
941                    r = r['OperationSegmentResponseBody']
942                    if 'status' in r:
943                        self.status = r['status']
944                else:
945                    raise service_error(service_error.internal, 
946                            "Bad response!?: %s" %r)
947                return True
948            except service_error, e:
949                self.log.error("Operation segment failed on %s: %s" % \
950                        (self.testbed, e))
951                return False
952
953    def annotate_topology(self, top, data):
954        # These routines do various parts of the annotation
955        def add_new_names(nl, l):
956            """ add any names in nl to the list in l """
957            for n in nl:
958                if n not in l: l.append(n)
959       
960        def merge_services(ne, e):
961            for ns in ne.service:
962                # NB: the else is on the for
963                for s in e.service:
964                    if ns.name == s.name:
965                        s.importer = ns.importer
966                        s.param = ns.param
967                        s.description = ns.description
968                        s.status = ns.status
969                        break
970                else:
971                    e.service.append(ns)
972       
973        def merge_oses(ne, e):
974            """
975            Merge the operating system entries of ne into e
976            """
977            for nos in ne.os:
978                # NB: the else is on the for
979                for os in e.os:
980                    if nos.name == os.name:
981                        os.version = nos.version
982                        os.version = nos.distribution
983                        os.version = nos.distributionversion
984                        for a in nos.attribute:
985                            if os.get_attribute(a.attribute):
986                                os.remove_attribute(a.attribute)
987                            os.set_attribute(a.attribute, a.value)
988                        break
989                else:
990                    # If both nodes have one OS, this is a replacement
991                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
992                    else: e.os.append(nos)
993
994        # Annotate the topology with embedding info
995        for e in top.elements:
996            if isinstance(e, topdl.Computer):
997                for s in data:
998                    ne = s.node.get(e.name, None)
999                    if ne is not None:
1000                        add_new_names(ne.localname, e.localname)
1001                        e.status = ne.status
1002                        merge_services(ne, e)
1003                        add_new_names(ne.operation, e.operation)
1004                        if ne.os: merge_oses(ne, e)
1005                        break
1006            elif isinstance(e,topdl.Testbed):
1007                for s in data:
1008                    ne = s.tb.get(e.uri, None)
1009                    if ne is not None:
1010                        add_new_names(ne.localname, e.localname)
1011                        add_new_names(ne.operation, e.operation)
1012                        merge_services(ne, e)
1013                        for a in ne.attribute:
1014                            e.set_attribute(a.attribute, a.value)
1015        # Annotate substrates
1016        for s in top.substrates:
1017            for d in data:
1018                ss = d.subs.get(s.name, None)
1019                if ss is not None:
1020                    if ss.capacity is not None:
1021                        s.capacity = ss.capacity
1022                    if s.latency is not None:
1023                        s.latency = ss.latency
1024
1025
1026
1027    def allocate_resources(self, allocated, masters, eid, expid, 
1028            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1029            attrs=None, connInfo={}, tbmap=None, expcert=None):
1030
1031        started = { }           # Testbeds where a sub-experiment started
1032                                # successfully
1033
1034        # XXX
1035        fail_soft = False
1036
1037        if tbmap is None: tbmap = { }
1038
1039        log = alloc_log or self.log
1040
1041        tp = thread_pool(self.nthreads)
1042        threads = [ ]
1043        starters = [ ]
1044
1045        if expcert:
1046            cert = expcert
1047            pw = None
1048        else:
1049            cert = self.cert_file
1050            pw = self.cert_pwd
1051
1052        for tb in allocated.keys():
1053            # Create and start a thread to start the segment, and save it
1054            # to get the return value later
1055            tb_attrs = copy.copy(attrs)
1056            tp.wait_for_slot()
1057            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1058            base, suffix = split_testbed(tb)
1059            if suffix:
1060                tb_attrs.append({'attribute': 'experiment_name', 
1061                    'value': "%s-%s" % (eid, suffix)})
1062            else:
1063                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1064            if not uri:
1065                raise service_error(service_error.internal, 
1066                        "Unknown testbed %s !?" % tb)
1067
1068            aid = tbparams[tb].allocID
1069            if not aid:
1070                raise service_error(service_error.internal, 
1071                        "No alloc id for testbed %s !?" % tb)
1072
1073            s = self.start_segment(log=log, debug=self.debug,
1074                    testbed=tb, cert_file=cert,
1075                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1076                    caller=self.call_StartSegment,
1077                    log_collector=log_collector)
1078            starters.append(s)
1079            t  = pooled_thread(\
1080                    target=s, name=tb,
1081                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1082                    pdata=tp, trace_file=self.trace_file)
1083            threads.append(t)
1084            t.start()
1085
1086        # Wait until all finish (keep pinging the log, though)
1087        mins = 0
1088        revoked = False
1089        while not tp.wait_for_all_done(60.0):
1090            mins += 1
1091            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1092                    % mins)
1093            if not revoked and \
1094                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1095                # a testbed has failed.  Revoke this experiment's
1096                # synchronizarion values so that sub experiments will not
1097                # deadlock waiting for synchronization that will never happen
1098                self.log.info("A subexperiment has failed to swap in, " + \
1099                        "revoking synch keys")
1100                var_key = "fedid:%s" % expid
1101                for k in self.synch_store.all_keys():
1102                    if len(k) > 45 and k[0:46] == var_key:
1103                        self.synch_store.revoke_key(k)
1104                revoked = True
1105
1106        failed = [ t.getName() for t in threads if not t.rv ]
1107        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1108
1109        # If one failed clean up, unless fail_soft is set
1110        if failed:
1111            if not fail_soft:
1112                tp.clear()
1113                for tb in succeeded:
1114                    # Create and start a thread to stop the segment
1115                    tp.wait_for_slot()
1116                    uri = tbparams[tb].uri
1117                    t  = pooled_thread(\
1118                            target=self.terminate_segment(log=log,
1119                                testbed=tb,
1120                                cert_file=cert, 
1121                                cert_pwd=pw,
1122                                trusted_certs=self.trusted_certs,
1123                                caller=self.call_TerminateSegment),
1124                            args=(uri, tbparams[tb].allocID),
1125                            name=tb,
1126                            pdata=tp, trace_file=self.trace_file)
1127                    t.start()
1128                # Wait until all finish (if any are being stopped)
1129                if succeeded:
1130                    tp.wait_for_all_done()
1131
1132                # release the allocations
1133                for tb in tbparams.keys():
1134                    try:
1135                        self.release_access(tb, tbparams[tb].allocID, 
1136                                tbmap=tbmap, uri=tbparams[tb].uri,
1137                                cert_file=cert, cert_pwd=pw)
1138                    except service_error, e:
1139                        self.log.warn("Error releasing access: %s" % e.desc)
1140                # Remove the placeholder
1141                self.state_lock.acquire()
1142                self.state[eid].status = 'failed'
1143                self.state[eid].updated()
1144                if self.state_filename: self.write_state()
1145                self.state_lock.release()
1146                # Remove the repo dir
1147                self.remove_dirs("%s/%s" %(self.repodir, expid))
1148                # Walk up tmpdir, deleting as we go
1149                if self.cleanup:
1150                    self.remove_dirs(tmpdir)
1151                else:
1152                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1153
1154
1155                log.error("Swap in failed on %s" % ",".join(failed))
1156                return
1157        else:
1158            # Walk through the successes and gather the proofs
1159            proofs = { }
1160            for s in starters:
1161                if s.proof:
1162                    proofs[s.testbed] = s.proof
1163            self.annotate_topology(top, starters)
1164            log.info("[start_segment]: Experiment %s active" % eid)
1165
1166
1167        # Walk up tmpdir, deleting as we go
1168        if self.cleanup:
1169            self.remove_dirs(tmpdir)
1170        else:
1171            log.debug("[start_experiment]: not removing %s" % tmpdir)
1172
1173        # Insert the experiment into our state and update the disk copy.
1174        self.state_lock.acquire()
1175        self.state[expid].status = 'active'
1176        self.state[eid] = self.state[expid]
1177        self.state[eid].top = top
1178        self.state[eid].updated()
1179        # Append startup proofs
1180        for f in self.state[eid].get_all_allocations():
1181            if f.tb in proofs:
1182                f.proof.append(proofs[f.tb])
1183
1184        if self.state_filename: self.write_state()
1185        self.state_lock.release()
1186        return
1187
1188
1189    def add_kit(self, e, kit):
1190        """
1191        Add a Software object created from the list of (install, location)
1192        tuples passed as kit  to the software attribute of an object e.  We
1193        do this enough to break out the code, but it's kind of a hack to
1194        avoid changing the old tuple rep.
1195        """
1196
1197        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1198
1199        if isinstance(e.software, list): e.software.extend(s)
1200        else: e.software = s
1201
1202    def append_experiment_authorization(self, expid, attrs, 
1203            need_state_lock=True):
1204        """
1205        Append the authorization information to system state
1206        """
1207
1208        for p, a in attrs:
1209            if p and a:
1210                # Improperly configured overrides can add bad principals.
1211                self.auth.set_attribute(p, a)
1212            else:
1213                self.log.debug("Bad append experiment_authorization %s %s" \
1214                        % (p, a))
1215        self.auth.save()
1216
1217        if need_state_lock: self.state_lock.acquire()
1218        # XXX: really a no op?
1219        #self.state[expid]['auth'].update(attrs)
1220        if self.state_filename: self.write_state()
1221        if need_state_lock: self.state_lock.release()
1222
1223    def clear_experiment_authorization(self, expid, need_state_lock=True):
1224        """
1225        Attrs is a set of attribute principal pairs that need to be removed
1226        from the authenticator.  Remove them and save the authenticator.
1227        """
1228
1229        if need_state_lock: self.state_lock.acquire()
1230        # XXX: should be a no-op
1231        #if expid in self.state and 'auth' in self.state[expid]:
1232            #for p, a in self.state[expid]['auth']:
1233                #self.auth.unset_attribute(p, a)
1234            #self.state[expid]['auth'] = set()
1235        if self.state_filename: self.write_state()
1236        if need_state_lock: self.state_lock.release()
1237        self.auth.save()
1238
1239
1240    def create_experiment_state(self, fid, req, expid, expcert,
1241            state='starting'):
1242        """
1243        Create the initial entry in the experiment's state.  The expid and
1244        expcert are the experiment's fedid and certifacte that represents that
1245        ID, which are installed in the experiment state.  If the request
1246        includes a suggested local name that is used if possible.  If the local
1247        name is already taken by an experiment owned by this user that has
1248        failed, it is overwritten.  Otherwise new letters are added until a
1249        valid localname is found.  The generated local name is returned.
1250        """
1251
1252        if req.has_key('experimentID') and \
1253                req['experimentID'].has_key('localname'):
1254            overwrite = False
1255            eid = req['experimentID']['localname']
1256            # If there's an old failed experiment here with the same local name
1257            # and accessible by this user, we'll overwrite it, otherwise we'll
1258            # fall through and do the collision avoidance.
1259            old_expid = self.get_experiment_fedid(eid)
1260            if old_expid:
1261                users_experiment = True
1262                try:
1263                    self.check_experiment_access(fid, old_expid)
1264                except service_error, e:
1265                    if e.code == service_error.access: users_experiment = False
1266                    else: raise e
1267                if users_experiment:
1268                    self.state_lock.acquire()
1269                    status = self.state[eid].status
1270                    if status and status == 'failed':
1271                        # remove the old access attributes
1272                        self.clear_experiment_authorization(eid,
1273                                need_state_lock=False)
1274                        overwrite = True
1275                        del self.state[eid]
1276                        del self.state[old_expid]
1277                    self.state_lock.release()
1278                else:
1279                    self.log.info('Experiment %s exists, ' % eid + \
1280                            'but this user cannot access it')
1281            self.state_lock.acquire()
1282            while (self.state.has_key(eid) and not overwrite):
1283                eid += random.choice(string.ascii_letters)
1284            # Initial state
1285            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1286                    identity=expcert)
1287            self.state[expid] = self.state[eid]
1288            if self.state_filename: self.write_state()
1289            self.state_lock.release()
1290        else:
1291            eid = self.exp_stem
1292            for i in range(0,5):
1293                eid += random.choice(string.ascii_letters)
1294            self.state_lock.acquire()
1295            while (self.state.has_key(eid)):
1296                eid = self.exp_stem
1297                for i in range(0,5):
1298                    eid += random.choice(string.ascii_letters)
1299            # Initial state
1300            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1301                    identity=expcert)
1302            self.state[expid] = self.state[eid]
1303            if self.state_filename: self.write_state()
1304            self.state_lock.release()
1305
1306        # Let users touch the state.  Authorize this fid and the expid itself
1307        # to touch the experiment, as well as allowing th eoverrides.
1308        self.append_experiment_authorization(eid, 
1309                set([(fid, expid), (expid,expid)] + \
1310                        [ (o, expid) for o in self.overrides]))
1311
1312        return eid
1313
1314
1315    def allocate_ips_to_topo(self, top):
1316        """
1317        Add an ip4_address attribute to all the hosts in the topology that have
1318        not already been assigned one by the user, based on the shared
1319        substrates on which they sit.  An /etc/hosts file is also created and
1320        returned as a list of hostfiles entries.  We also return the allocator,
1321        because we may need to allocate IPs to portals
1322        (specifically DRAGON portals).
1323        """
1324        subs = sorted(top.substrates, 
1325                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1326                reverse=True)
1327        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1328        ifs = { }
1329        hosts = [ ]
1330
1331        assigned = { }
1332        assigned_subs = set()
1333        # Look for IP networks that were already assigned to the experiment by
1334        # the user.  We assume that users are smart in making that assignment,
1335        # but we check them later.
1336        for e in top.elements:
1337            if not isinstance(e, topdl.Computer): continue
1338            for inf in e.interface:
1339                a = inf.get_attribute('ip4_address')
1340                if not a: continue
1341                for s in inf.substrate:
1342                    assigned_subs.add(s)
1343                a = ip_addr(a)
1344                n = ip_addr(inf.get_attribute('ip4_netmask'))
1345
1346                sz = 0x100000000 - long(n)
1347                net = (long(a) & long(n)) & 0xffffffff
1348                if net in assigned:
1349                    if sz > assigned[net]: assigned[net] = sz
1350                assigned[net] = sz
1351
1352        # XXX: hack attack - always avoid 10.0.23.0 which is special on DETER
1353        assigned[long(ip_addr('10.0.23.0'))] = 256
1354        # end of hack
1355
1356        # Walk all substrates, smallest to largest, assigning IP addresses to
1357        # the unassigned and checking the assigned.  Build an /etc/hosts file
1358        # as well.
1359        for idx, s in enumerate(subs):
1360            if s.name not in assigned_subs:
1361                net_size = len(s.interfaces)+2
1362
1363                # Get an ip allocation for this substrate.  Make sure the
1364                # allocation does not collide with any user allocations.
1365                ok = False
1366                while not ok:
1367                    a = ips.allocate(net_size)
1368                    if not a:
1369                        raise service_error(service_error.req, 
1370                                "Cannot allocate IP addresses")
1371                    base, num = a
1372                    if num < net_size: 
1373                        raise service_error(service_error.internal,
1374                                "Allocator returned wrong number of IPs??")
1375                    # Check for overlap.  An assigned network could contain an
1376                    # endpoint of the new allocation or the new allocation
1377                    # could contain an endpoint of an assigned network.
1378                    # NB the else is attached to the for - if the loop is not
1379                    # exited by a break, ok = True.
1380                    for n, sz in assigned.items():
1381                        if base >= n and base < n + sz:
1382                            ok = False
1383                            break
1384                        if base + num > n and base + num  < n + sz:
1385                            ok = False
1386                            break
1387                        if n >= base and n < base + num:
1388                            ok = False
1389                            break
1390                        if n+sz > base and n+sz < base + num:
1391                            ok = False
1392                            break
1393                    else:
1394                        ok = True
1395                           
1396                mask = ips.min_alloc
1397                while mask < net_size:
1398                    mask *= 2
1399
1400                netmask = ((2**32-1) ^ (mask-1))
1401            else:
1402                base = 0
1403
1404            # Add the attributes and build hosts
1405            base += 1
1406            for i in s.interfaces:
1407                # Add address and masks to interfaces on unassigned substrates
1408                if s.name not in assigned_subs:
1409                    i.attribute.append(
1410                            topdl.Attribute('ip4_address', 
1411                                "%s" % ip_addr(base)))
1412                    i.attribute.append(
1413                            topdl.Attribute('ip4_netmask', 
1414                                "%s" % ip_addr(int(netmask))))
1415
1416                # Make /etc/hosts entries
1417                addr = i.get_attribute('ip4_address')
1418                if addr is None:
1419                    raise service_error(service_error.req, 
1420                            "Partially assigned substrate %s" %s.name)
1421                hname = i.element.name
1422                if hname in ifs:
1423                    hosts.append("%s\t%s-%s %s-%d" % \
1424                            (addr, hname, s.name, hname, ifs[hname]))
1425                else:
1426                    # First IP allocated it the default ip according to hosts,
1427                    # so the extra alias is here.
1428                    ifs[hname] = 0
1429                    hosts.append("%s\t%s-%s %s-%d %s" % \
1430                            (addr, hname, s.name, hname, ifs[hname], hname))
1431
1432                ifs[hname] += 1
1433                base += 1
1434        return hosts, ips
1435
1436    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1437            tbparam, masters, tbmap, expid=None, expcert=None):
1438        for tb in testbeds:
1439            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1440                    expcert)
1441            allocated[tb] = 1
1442
1443    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1444            expcert=None):
1445        """
1446        Get access to testbed through fedd and set the parameters for that tb
1447        """
1448        def get_export_project(svcs):
1449            """
1450            Look through for the list of federated_service for this testbed
1451            objects for a project_export service, and extract the project
1452            parameter.
1453            """
1454
1455            pe = [s for s in svcs if s.name=='project_export']
1456            if len(pe) == 1:
1457                return pe[0].params.get('project', None)
1458            elif len(pe) == 0:
1459                return None
1460            else:
1461                raise service_error(service_error.req,
1462                        "More than one project export is not supported")
1463
1464        def add_services(svcs, type, slist, keys):
1465            """
1466            Add the given services to slist.  type is import or export.  Also
1467            add a mapping entry from the assigned id to the original service
1468            record.
1469            """
1470            for i, s in enumerate(svcs):
1471                idx = '%s%d' % (type, i)
1472                keys[idx] = s
1473                sr = {'id': idx, 'name': s.name, 'visibility': type }
1474                if s.params:
1475                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1476                            for k, v in s.params.items()]
1477                slist.append(sr)
1478
1479        uri = tbmap.get(testbed_base(tb), None)
1480        if not uri:
1481            raise service_error(service_error.server_config, 
1482                    "Unknown testbed: %s" % tb)
1483
1484        export_svcs = masters.get(tb,[])
1485        import_svcs = [ s for m in masters.values() \
1486                for s in m \
1487                    if tb in s.importers ]
1488
1489        export_project = get_export_project(export_svcs)
1490        # Compose the credential list so that IDs come before attributes
1491        creds = set()
1492        keys = set()
1493        certs = self.auth.get_creds_for_principal(fid)
1494        # Append credenials about this experiment controller - e.g. that it is
1495        # trusted.
1496        certs.update(self.auth.get_creds_for_principal(
1497            fedid(file=self.cert_file)))
1498        if expid:
1499            certs.update(self.auth.get_creds_for_principal(expid))
1500        for c in certs:
1501            keys.add(c.issuer_cert())
1502            creds.add(c.attribute_cert())
1503        creds = list(keys) + list(creds)
1504
1505        if expcert: cert, pw = expcert, None
1506        else: cert, pw = self.cert_file, self.cert_pw
1507
1508        # Request credentials
1509        req = {
1510                'abac_credential': creds,
1511            }
1512        # Make the service request from the services we're importing and
1513        # exporting.  Keep track of the export request ids so we can
1514        # collect the resulting info from the access response.
1515        e_keys = { }
1516        if import_svcs or export_svcs:
1517            slist = []
1518            add_services(import_svcs, 'import', slist, e_keys)
1519            add_services(export_svcs, 'export', slist, e_keys)
1520            req['service'] = slist
1521
1522        if self.local_access.has_key(uri):
1523            # Local access call
1524            req = { 'RequestAccessRequestBody' : req }
1525            r = self.local_access[uri].RequestAccess(req, 
1526                    fedid(file=self.cert_file))
1527            r = { 'RequestAccessResponseBody' : r }
1528        else:
1529            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1530
1531        if r.has_key('RequestAccessResponseBody'):
1532            # Through to here we have a valid response, not a fault.
1533            # Access denied is a fault, so something better or worse than
1534            # access denied has happened.
1535            r = r['RequestAccessResponseBody']
1536            self.log.debug("[get_access] Access granted")
1537        else:
1538            raise service_error(service_error.protocol,
1539                        "Bad proxy response")
1540        if 'proof' not in r:
1541            raise service_error(service_error.protocol,
1542                        "Bad access response (no access proof)")
1543
1544        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1545                tb=tb, uri=uri, proof=[r['proof']], 
1546                services=masters.get(tb, None))
1547
1548        # Collect the responses corresponding to the services this testbed
1549        # exports.  These will be the service requests that we will include in
1550        # the start segment requests (with appropriate visibility values) to
1551        # import and export the segments.
1552        for s in r.get('service', []):
1553            id = s.get('id', None)
1554            # Note that this attaches the response to the object in the masters
1555            # data structure.  (The e_keys index disappears when this fcn
1556            # returns)
1557            if id and id in e_keys:
1558                e_keys[id].reqs.append(s)
1559
1560        # Add attributes to parameter space.  We don't allow attributes to
1561        # overlay any parameters already installed.
1562        for a in r.get('fedAttr', []):
1563            try:
1564                if a['attribute']:
1565                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1566            except KeyError:
1567                self.log.error("Bad attribute in response: %s" % a)
1568
1569
1570    def split_topology(self, top, topo, testbeds):
1571        """
1572        Create the sub-topologies that are needed for experiment instantiation.
1573        """
1574        for tb in testbeds:
1575            topo[tb] = top.clone()
1576            # copy in for loop allows deletions from the original
1577            for e in [ e for e in topo[tb].elements]:
1578                etb = e.get_attribute('testbed')
1579                # NB: elements without a testbed attribute won't appear in any
1580                # sub topologies. 
1581                if not etb or etb != tb:
1582                    for i in e.interface:
1583                        for s in i.subs:
1584                            try:
1585                                s.interfaces.remove(i)
1586                            except ValueError:
1587                                raise service_error(service_error.internal,
1588                                        "Can't remove interface??")
1589                    topo[tb].elements.remove(e)
1590            topo[tb].make_indices()
1591
1592    def confirm_software(self, top):
1593        """
1594        Make sure that the software to be loaded in the topo is all available
1595        before we begin making access requests, etc.  This is a subset of
1596        wrangle_software.
1597        """
1598        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1599        pkgs.update([x.location for e in top.elements for x in e.software])
1600
1601        for pkg in pkgs:
1602            loc = pkg
1603
1604            scheme, host, path = urlparse(loc)[0:3]
1605            dest = os.path.basename(path)
1606            if not scheme:
1607                if not loc.startswith('/'):
1608                    loc = "/%s" % loc
1609                loc = "file://%s" %loc
1610            # NB: if scheme was found, loc == pkg
1611            try:
1612                u = urlopen(loc)
1613                u.close()
1614            except Exception, e:
1615                raise service_error(service_error.req, 
1616                        "Cannot open %s: %s" % (loc, e))
1617        return True
1618
1619    def wrangle_software(self, expid, top, topo, tbparams):
1620        """
1621        Copy software out to the repository directory, allocate permissions and
1622        rewrite the segment topologies to look for the software in local
1623        places.
1624        """
1625
1626        # Copy the rpms and tarfiles to a distribution directory from
1627        # which the federants can retrieve them
1628        linkpath = "%s/software" %  expid
1629        softdir ="%s/%s" % ( self.repodir, linkpath)
1630        softmap = { }
1631
1632        # self.fedkit and self.gateway kit are lists of tuples of
1633        # (install_location, download_location) this extracts the download
1634        # locations.
1635        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1636        pkgs.update([x.location for e in top.elements for x in e.software])
1637        try:
1638            os.makedirs(softdir)
1639        except EnvironmentError, e:
1640            raise service_error(
1641                    "Cannot create software directory: %s" % e)
1642        # The actual copying.  Everything's converted into a url for copying.
1643        auth_attrs = set()
1644        for pkg in pkgs:
1645            loc = pkg
1646
1647            scheme, host, path = urlparse(loc)[0:3]
1648            dest = os.path.basename(path)
1649            if not scheme:
1650                if not loc.startswith('/'):
1651                    loc = "/%s" % loc
1652                loc = "file://%s" %loc
1653            # NB: if scheme was found, loc == pkg
1654            try:
1655                u = urlopen(loc)
1656            except Exception, e:
1657                raise service_error(service_error.req, 
1658                        "Cannot open %s: %s" % (loc, e))
1659            try:
1660                f = open("%s/%s" % (softdir, dest) , "w")
1661                self.log.debug("Writing %s/%s" % (softdir,dest) )
1662                data = u.read(4096)
1663                while data:
1664                    f.write(data)
1665                    data = u.read(4096)
1666                f.close()
1667                u.close()
1668            except Exception, e:
1669                raise service_error(service_error.internal,
1670                        "Could not copy %s: %s" % (loc, e))
1671            path = re.sub("/tmp", "", linkpath)
1672            # XXX
1673            softmap[pkg] = \
1674                    "%s/%s/%s" %\
1675                    ( self.repo_url, path, dest)
1676
1677            # Allow the individual segments to access the software by assigning
1678            # an attribute to each testbed allocation that encodes the data to
1679            # be released.  This expression collects the data for each run of
1680            # the loop.
1681            auth_attrs.update([
1682                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1683                        for tb in tbparams.keys()])
1684
1685        self.append_experiment_authorization(expid, auth_attrs)
1686
1687        # Convert the software locations in the segments into the local
1688        # copies on this host
1689        for soft in [ s for tb in topo.values() \
1690                for e in tb.elements \
1691                    if getattr(e, 'software', False) \
1692                        for s in e.software ]:
1693            if softmap.has_key(soft.location):
1694                soft.location = softmap[soft.location]
1695
1696
1697    def new_experiment(self, req, fid):
1698        """
1699        The external interface to empty initial experiment creation called from
1700        the dispatcher.
1701
1702        Creates a working directory, splits the incoming description using the
1703        splitter script and parses out the avrious subsections using the
1704        lcasses above.  Once each sub-experiment is created, use pooled threads
1705        to instantiate them and start it all up.
1706        """
1707        self.log.info("New experiment call started for %s" % fid)
1708        req = req.get('NewRequestBody', None)
1709        if not req:
1710            raise service_error(service_error.req,
1711                    "Bad request format (no NewRequestBody)")
1712
1713        # import may partially succeed so always save credentials and warn
1714        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1715            self.log.debug("Failed to import delegation credentials(!)")
1716        self.get_grouper_updates(fid)
1717        self.auth.update()
1718        self.auth.save()
1719
1720        try:
1721            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1722                    with_proof=True)
1723        except service_error, e:
1724            self.log.info("New experiment call for %s: access denied" % fid)
1725            raise e
1726
1727
1728        if not access_ok:
1729            self.log.info("New experiment call for %s: Access denied" % fid)
1730            raise service_error(service_error.access, "New access denied",
1731                    proof=[proof])
1732
1733        try:
1734            tmpdir = tempfile.mkdtemp(prefix="split-")
1735        except EnvironmentError:
1736            raise service_error(service_error.internal, "Cannot create tmp dir")
1737
1738        # Generate an ID for the experiment (slice) and a certificate that the
1739        # allocator can use to prove they own it.  We'll ship it back through
1740        # the encrypted connection.  If the requester supplied one, use it.
1741        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1742            expcert = req['experimentAccess']['X509']
1743            expid = fedid(certstr=expcert)
1744            self.state_lock.acquire()
1745            if expid in self.state:
1746                self.state_lock.release()
1747                raise service_error(service_error.req, 
1748                        'fedid %s identifies an existing experiment' % expid)
1749            self.state_lock.release()
1750        else:
1751            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1752
1753        #now we're done with the tmpdir, and it should be empty
1754        if self.cleanup:
1755            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1756            os.rmdir(tmpdir)
1757        else:
1758            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1759
1760        eid = self.create_experiment_state(fid, req, expid, expcert, 
1761                state='empty')
1762
1763        rv = {
1764                'experimentID': [
1765                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1766                ],
1767                'experimentStatus': 'empty',
1768                'experimentAccess': { 'X509' : expcert },
1769                'proof': proof.to_dict(),
1770            }
1771
1772        self.log.info("New experiment call succeeded for %s" % fid)
1773        return rv
1774
1775    # create_experiment sub-functions
1776
1777    @staticmethod
1778    def get_experiment_key(req, field='experimentID'):
1779        """
1780        Parse the experiment identifiers out of the request (the request body
1781        tag has been removed).  Specifically this pulls either the fedid or the
1782        localname out of the experimentID field.  A fedid is preferred.  If
1783        neither is present or the request does not contain the fields,
1784        service_errors are raised.
1785        """
1786        # Get the experiment access
1787        exp = req.get(field, None)
1788        if exp:
1789            if exp.has_key('fedid'):
1790                key = exp['fedid']
1791            elif exp.has_key('localname'):
1792                key = exp['localname']
1793            else:
1794                raise service_error(service_error.req, "Unknown lookup type")
1795        else:
1796            raise service_error(service_error.req, "No request?")
1797
1798        return key
1799
1800    def get_experiment_ids_and_start(self, key, tmpdir):
1801        """
1802        Get the experiment name, id and access certificate from the state, and
1803        set the experiment state to 'starting'.  returns a triple (fedid,
1804        localname, access_cert_file). The access_cert_file is a copy of the
1805        contents of the access certificate, created in the tempdir with
1806        restricted permissions.  If things are confused, raise an exception.
1807        """
1808
1809        expid = eid = None
1810        self.state_lock.acquire()
1811        if key in self.state:
1812            exp = self.state[key]
1813            exp.status = "starting"
1814            exp.updated()
1815            expid = exp.fedid
1816            eid = exp.localname
1817            expcert = exp.identity
1818        self.state_lock.release()
1819
1820        # make a protected copy of the access certificate so the experiment
1821        # controller can act as the experiment principal.
1822        if expcert:
1823            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1824            if not expcert_file:
1825                raise service_error(service_error.internal, 
1826                        "Cannot create temp cert file?")
1827        else:
1828            expcert_file = None
1829
1830        return (eid, expid, expcert_file)
1831
1832    def get_topology(self, req, tmpdir):
1833        """
1834        Get the ns2 content and put it into a file for parsing.  Call the local
1835        or remote parser and return the topdl.Topology.  Errors result in
1836        exceptions.  req is the request and tmpdir is a work directory.
1837        """
1838
1839        # The tcl parser needs to read a file so put the content into that file
1840        descr=req.get('experimentdescription', None)
1841        if descr:
1842            if 'ns2description' in descr:
1843                file_content=descr['ns2description']
1844            elif 'topdldescription' in descr:
1845                return topdl.Topology(**descr['topdldescription'])
1846            else:
1847                raise service_error(service_error.req, 
1848                        'Unknown experiment description type')
1849        else:
1850            raise service_error(service_error.req, "No experiment description")
1851
1852
1853        if self.splitter_url:
1854            self.log.debug("Calling remote topdl translator at %s" % \
1855                    self.splitter_url)
1856            top = self.remote_ns2topdl(self.splitter_url, file_content)
1857        else:
1858            tclfile = os.path.join(tmpdir, "experiment.tcl")
1859            if file_content:
1860                try:
1861                    f = open(tclfile, 'w')
1862                    f.write(file_content)
1863                    f.close()
1864                except EnvironmentError:
1865                    raise service_error(service_error.internal,
1866                            "Cannot write temp experiment description")
1867            else:
1868                raise service_error(service_error.req, 
1869                        "Only ns2descriptions supported")
1870            pid = "dummy"
1871            gid = "dummy"
1872            eid = "dummy"
1873
1874            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1875                str(self.muxmax), '-m', 'dummy']
1876
1877            tclcmd.extend([pid, gid, eid, tclfile])
1878
1879            self.log.debug("running local splitter %s", " ".join(tclcmd))
1880            # This is just fantastic.  As a side effect the parser copies
1881            # tb_compat.tcl into the current directory, so that directory
1882            # must be writable by the fedd user.  Doing this in the
1883            # temporary subdir ensures this is the case.
1884            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1885                    cwd=tmpdir)
1886            split_data = tclparser.stdout
1887
1888            top = topdl.topology_from_xml(file=split_data, top="experiment")
1889            os.remove(tclfile)
1890
1891        return top
1892
1893    def get_testbed_services(self, req, testbeds):
1894        """
1895        Parse the services section of the request into two dicts mapping
1896        testbed to lists of federated_service objects.  The first dict maps all
1897        exporters of services to those service objects, the second maps
1898        testbeds to service objects only for services requiring portals.
1899        """
1900
1901        # Sanity check the services.  Exports or imports from unknown testbeds
1902        # cause an exception.
1903        for s in req.get('service', []):
1904            for t in s.get('import', []):
1905                if t not in testbeds:
1906                    raise service_error(service_error.req, 
1907                            'Service import to unknown testbed: %s' %t)
1908            for t in s.get('export', []):
1909                if t not in testbeds:
1910                    raise service_error(service_error.req, 
1911                            'Service export from unknown testbed: %s' %t)
1912
1913
1914        # We construct both dicts here because deriving the second is more
1915        # complex than it looks - both the keys and lists can differ, and it's
1916        # much easier to generate both in one pass.
1917        masters = { }
1918        pmasters = { }
1919        for s in req.get('service', []):
1920            # If this is a service request with the importall field
1921            # set, fill it out.
1922
1923            if s.get('importall', False):
1924                s['import'] = [ tb for tb in testbeds \
1925                        if tb not in s.get('export',[])]
1926                del s['importall']
1927
1928            # Add the service to masters
1929            for tb in s.get('export', []):
1930                if s.get('name', None):
1931
1932                    params = { }
1933                    for a in s.get('fedAttr', []):
1934                        params[a.get('attribute', '')] = a.get('value','')
1935
1936                    fser = federated_service(name=s['name'],
1937                            exporter=tb, importers=s.get('import',[]),
1938                            params=params)
1939                    if fser.name == 'hide_hosts' \
1940                            and 'hosts' not in fser.params:
1941                        fser.params['hosts'] = \
1942                                ",".join(tb_hosts.get(fser.exporter, []))
1943                    if tb in masters: masters[tb].append(fser)
1944                    else: masters[tb] = [fser]
1945
1946                    if fser.portal:
1947                        if tb in pmasters: pmasters[tb].append(fser)
1948                        else: pmasters[tb] = [fser]
1949                else:
1950                    self.log.error('Testbed service does not have name " + \
1951                            "and importers')
1952        return masters, pmasters
1953
1954    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1955        """
1956        Create the ssh keys necessary for interconnecting the portal nodes and
1957        the global hosts file for letting each segment know about the IP
1958        addresses in play.  If we have computed static routes, copy them into
1959        the repo. Save these into the repo.  Add attributes to the autorizer
1960        allowing access controllers to download them and return a set of
1961        attributes that inform the segments where to find this stuff.  May
1962        raise service_errors in if there are problems.
1963        """
1964        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1965        gw_secretkey_base = "fed.%s" % self.ssh_type
1966        keydir = os.path.join(tmpdir, 'keys')
1967        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1968        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1969        route = os.path.join(tmpdir, 'route.tgz')
1970
1971        try:
1972            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1973        except ValueError:
1974            raise service_error(service_error.server_config, 
1975                    "Bad key type (%s)" % self.ssh_type)
1976
1977        self.generate_seer_certs(keydir)
1978
1979        # Copy configuration files into the remote file store
1980        # The config urlpath
1981        configpath = "/%s/config" % expid
1982        # The config file system location
1983        configdir ="%s%s" % ( self.repodir, configpath)
1984        route_conf = os.path.join(configdir, 'route.tgz')
1985        try:
1986            os.makedirs(configdir)
1987        except EnvironmentError, e:
1988            raise service_error(service_error.internal,
1989                    "Cannot create config directory: %s" % e)
1990        try:
1991            f = open("%s/hosts" % configdir, "w")
1992            print >> f, string.join(hosts, '\n')
1993            f.close()
1994        except EnvironmentError, e:
1995            raise service_error(service_error.internal, 
1996                    "Cannot write hosts file: %s" % e)
1997        try:
1998            if os.path.exists(route):
1999                copy_file(route, route_conf)
2000
2001            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
2002            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
2003            copy_file(os.path.join(keydir, 'ca.pem'), 
2004                    os.path.join(configdir, 'ca.pem'))
2005            copy_file(os.path.join(keydir, 'node.pem'), 
2006                    os.path.join(configdir, 'node.pem'))
2007        except EnvironmentError, e:
2008            raise service_error(service_error.internal, 
2009                    "Cannot copy keyfiles: %s" % e)
2010
2011        # Allow the individual testbeds to access the configuration files,
2012        # again by setting an attribute for the relevant pathnames on each
2013        # allocation principal.  Yeah, that's a long list comprehension.
2014        self.append_experiment_authorization(expid, set([
2015            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
2016                    for tb in tbparams.keys() \
2017                        for f in ("hosts", 'ca.pem', 'node.pem', 'route.tgz',
2018                            gw_secretkey_base, gw_pubkey_base)]))
2019
2020        attrs = [ 
2021                {
2022                    'attribute': 'ssh_pubkey', 
2023                    'value': '%s/%s/config/%s' % \
2024                            (self.repo_url, expid, gw_pubkey_base)
2025                },
2026                {
2027                    'attribute': 'ssh_secretkey', 
2028                    'value': '%s/%s/config/%s' % \
2029                            (self.repo_url, expid, gw_secretkey_base)
2030                },
2031                {
2032                    'attribute': 'hosts', 
2033                    'value': '%s/%s/config/hosts' % \
2034                            (self.repo_url, expid)
2035                },
2036                {
2037                    'attribute': 'seer_ca_pem', 
2038                    'value': '%s/%s/config/%s' % \
2039                            (self.repo_url, expid, 'ca.pem')
2040                },
2041                {
2042                    'attribute': 'seer_node_pem', 
2043                    'value': '%s/%s/config/%s' % \
2044                            (self.repo_url, expid, 'node.pem')
2045                },
2046            ]
2047        # Add info about static routes if we have some
2048        if os.path.exists(route_conf):
2049            attrs.append(
2050                {
2051                    'attribute': 'route.tgz', 
2052                    'value': '%s/%s/config/%s' % \
2053                            (self.repo_url, expid, 'route.tgz')
2054                }
2055            )
2056        return attrs
2057
2058
2059    def get_vtopo(self, req, fid):
2060        """
2061        Return the stored virtual topology for this experiment
2062        """
2063        rv = None
2064        state = None
2065        self.log.info("vtopo call started for %s" %  fid)
2066
2067        req = req.get('VtopoRequestBody', None)
2068        if not req:
2069            raise service_error(service_error.req,
2070                    "Bad request format (no VtopoRequestBody)")
2071        exp = req.get('experiment', None)
2072        if exp:
2073            if exp.has_key('fedid'):
2074                key = exp['fedid']
2075                keytype = "fedid"
2076            elif exp.has_key('localname'):
2077                key = exp['localname']
2078                keytype = "localname"
2079            else:
2080                raise service_error(service_error.req, "Unknown lookup type")
2081        else:
2082            raise service_error(service_error.req, "No request?")
2083
2084        try:
2085            proof = self.check_experiment_access(fid, key)
2086        except service_error, e:
2087            self.log.info("vtopo call failed for %s: access denied" %  fid)
2088            raise e
2089
2090        self.state_lock.acquire()
2091        # XXX: this needs to be recalculated
2092        if key in self.state:
2093            if self.state[key].top is not None:
2094                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2095                rv = { 'experiment' : {keytype: key },
2096                        'vtopo': vtopo,
2097                        'proof': proof.to_dict(), 
2098                    }
2099            else:
2100                state = self.state[key].status
2101        self.state_lock.release()
2102
2103        if rv: 
2104            self.log.info("vtopo call completed for %s %s " % \
2105                (key, fid))
2106            return rv
2107        else: 
2108            if state:
2109                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2110                    (key, fid))
2111                raise service_error(service_error.partial, 
2112                        "Not ready: %s" % state)
2113            else:
2114                self.log.info("vtopo call completed for %s %s (No experiment)"\
2115                        % (key, fid))
2116                raise service_error(service_error.req, "No such experiment")
2117
2118    def get_vis(self, req, fid):
2119        """
2120        Return the stored visualization for this experiment
2121        """
2122        rv = None
2123        state = None
2124
2125        self.log.info("vis call started for %s" %  fid)
2126        req = req.get('VisRequestBody', None)
2127        if not req:
2128            raise service_error(service_error.req,
2129                    "Bad request format (no VisRequestBody)")
2130        exp = req.get('experiment', None)
2131        if exp:
2132            if exp.has_key('fedid'):
2133                key = exp['fedid']
2134                keytype = "fedid"
2135            elif exp.has_key('localname'):
2136                key = exp['localname']
2137                keytype = "localname"
2138            else:
2139                raise service_error(service_error.req, "Unknown lookup type")
2140        else:
2141            raise service_error(service_error.req, "No request?")
2142
2143        try:
2144            proof = self.check_experiment_access(fid, key)
2145        except service_error, e:
2146            self.log.info("vis call failed for %s: access denied" %  fid)
2147            raise e
2148
2149        self.state_lock.acquire()
2150        # Generate the visualization
2151        if key in self.state:
2152            if self.state[key].top is not None:
2153                try:
2154                    vis = self.genviz(
2155                            topdl.topology_to_vtopo(self.state[key].top))
2156                except service_error, e:
2157                    self.state_lock.release()
2158                    raise e
2159                rv =  { 'experiment' : {keytype: key },
2160                        'vis': vis,
2161                        'proof': proof.to_dict(), 
2162                        }
2163            else:
2164                state = self.state[key].status
2165        self.state_lock.release()
2166
2167        if rv: 
2168            self.log.info("vis call completed for %s %s " % \
2169                (key, fid))
2170            return rv
2171        else:
2172            if state:
2173                self.log.info("vis call completed for %s %s (not ready)" % \
2174                    (key, fid))
2175                raise service_error(service_error.partial, 
2176                        "Not ready: %s" % state)
2177            else:
2178                self.log.info("vis call completed for %s %s (no experiment)" % \
2179                    (key, fid))
2180                raise service_error(service_error.req, "No such experiment")
2181
2182    @staticmethod
2183    def needs_route_computation(req):
2184        '''
2185        Walk the request services looking for a static routing request
2186        '''
2187        for s in req.get('service', []):
2188            if s.get('name', '') == 'static_routing':
2189                return True
2190        return False
2191
2192    def compute_static_routes(self, tmpdir, top):
2193        '''
2194        Compute a set of static routes for the topology.  The result is a file
2195        called route.tgz in tmpdir that contains a dinrectory, route, with a
2196        file per node in the topology that lists the prefix and router for all
2197        the routes in that node's routing table.  Exceptions from the route
2198        calculation program and the tar creation call are not caught.
2199        '''
2200        if self.routing is None:
2201            raise service_error(service_error.server, 
2202                    'Cannot provide staticroutes, no routing program specified')
2203        rg = tempfile.NamedTemporaryFile()
2204        outdir = os.path.join(tmpdir, 'route')
2205        tarfile = os.path.join(tmpdir, 'route.tgz')
2206        topology_to_route_file(top, file=rg)
2207        os.mkdir(outdir)
2208        try:
2209            for cmd in (
2210                    [self.routing, '--input', rg.name,'--output', outdir],
2211                    ['tar', '-C', tmpdir, '-czf', tarfile, 'route']):
2212                subprocess.check_call(cmd)
2213        except subprocess.CalledProcessError, e:
2214            raise service_error(service_error.internal, 
2215                    'Cannot call %s: %s' % (' '.join(cmd), e.returncode))
2216        rg.close()
2217        shutil.rmtree(outdir)
2218
2219
2220   
2221    def save_federant_information(self, allocated, tbparams, eid, top):
2222        """
2223        Store the various data that have changed in the experiment state
2224        between when it was started and the beginning of resource allocation.
2225        This is basically the information about each local allocation.  This
2226        fills in the values of the placeholder allocation in the state.  It
2227        also collects the access proofs and returns them as dicts for a
2228        response message.
2229        """
2230        self.state_lock.acquire()
2231        exp = self.state[eid]
2232        exp.top = top.clone()
2233        # save federant information
2234        for k in allocated.keys():
2235            exp.add_allocation(tbparams[k])
2236            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2237                type="testbed", localname=[k], 
2238                service=[ s.to_topdl() for s in tbparams[k].services]))
2239
2240        # Access proofs for the response message
2241        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2242                    for p in tbparams[k].proof]
2243        exp.updated()
2244        if self.state_filename: 
2245            self.write_state()
2246        self.state_lock.release()
2247        return proofs
2248
2249    def clear_placeholder(self, eid, expid, tmpdir):
2250        """
2251        Clear the placeholder and remove any allocated temporary dir.
2252        """
2253
2254        self.state_lock.acquire()
2255        del self.state[eid]
2256        del self.state[expid]
2257        if self.state_filename: self.write_state()
2258        self.state_lock.release()
2259        if tmpdir and self.cleanup:
2260            self.remove_dirs(tmpdir)
2261
2262    # end of create_experiment sub-functions
2263
2264    def create_experiment(self, req, fid):
2265        """
2266        The external interface to experiment creation called from the
2267        dispatcher.
2268
2269        Creates a working directory, splits the incoming description using the
2270        splitter script and parses out the various subsections using the
2271        classes above.  Once each sub-experiment is created, use pooled threads
2272        to instantiate them and start it all up.
2273        """
2274
2275        self.log.info("Create experiment call started for %s" % fid)
2276        req = req.get('CreateRequestBody', None)
2277        if req:
2278            key = self.get_experiment_key(req)
2279        else:
2280            raise service_error(service_error.req,
2281                    "Bad request format (no CreateRequestBody)")
2282
2283        # Import information from the requester
2284        # import may partially succeed so always save credentials and warn
2285        if not self.auth.import_credentials(data_list=req.get('credential',[])):
2286            self.log.debug("Failed to import delegation credentials(!)")
2287        self.get_grouper_updates(fid)
2288        self.auth.update()
2289        self.auth.save()
2290
2291        try:
2292            # Make sure that the caller can talk to us
2293            proof = self.check_experiment_access(fid, key)
2294        except service_error, e:
2295            self.log.info("Create experiment call failed for %s: access denied"\
2296                    % fid)
2297            raise e
2298
2299
2300        # Install the testbed map entries supplied with the request into a copy
2301        # of the testbed map.
2302        tbmap = dict(self.tbmap)
2303        for m in req.get('testbedmap', []):
2304            if 'testbed' in m and 'uri' in m:
2305                tbmap[m['testbed']] = m['uri']
2306
2307        # a place to work
2308        try:
2309            tmpdir = tempfile.mkdtemp(prefix="split-")
2310            os.mkdir(tmpdir+"/keys")
2311        except EnvironmentError:
2312            raise service_error(service_error.internal, "Cannot create tmp dir")
2313
2314        tbparams = { }
2315
2316        eid, expid, expcert_file = \
2317                self.get_experiment_ids_and_start(key, tmpdir)
2318
2319        # This catches exceptions to clear the placeholder if necessary
2320        try: 
2321            if not (eid and expid):
2322                raise service_error(service_error.internal, 
2323                        "Cannot find local experiment info!?")
2324
2325            top = self.get_topology(req, tmpdir)
2326            self.confirm_software(top)
2327            # Assign the IPs
2328            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2329            if self.needs_route_computation(req):
2330                self.compute_static_routes(tmpdir, top)
2331            # Find the testbeds to look up
2332            tb_hosts = { }
2333            testbeds = [ ]
2334            for e in top.elements:
2335                if isinstance(e, topdl.Computer):
2336                    tb = e.get_attribute('testbed')
2337                    # Put nodes not in a testbed into the default testbed if
2338                    # there is one.
2339                    if tb is None:
2340                        if self.default_tb is None:
2341                            raise service_error(service_error.req, 
2342                                '%s not in a testbed (and no default)' % e.name)
2343                        tb = self.default_tb
2344                        e.set_attribute('testbed', tb)
2345                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2346                    else: 
2347                        tb_hosts[tb] = [ e.name ]
2348                        testbeds.append(tb)
2349
2350            masters, pmasters = self.get_testbed_services(req, testbeds)
2351            allocated = { }         # Testbeds we can access
2352            topo ={ }               # Sub topologies
2353            connInfo = { }          # Connection information
2354
2355            self.get_access_to_testbeds(testbeds, fid, allocated, 
2356                    tbparams, masters, tbmap, expid, expcert_file)
2357
2358            # tbactive is the set of testbeds that have NATs in front of their
2359            # portals. They need to initiate connections.
2360            tbactive = set([k for k, v in tbparams.items() \
2361                    if v.get_attribute('nat_portals')])
2362
2363            self.split_topology(top, topo, testbeds)
2364
2365            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2366
2367            part = experiment_partition(self.auth, self.store_url, tbmap,
2368                    self.muxmax, self.direct_transit, tbactive)
2369            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2370                    connInfo, expid)
2371
2372            auth_attrs = set()
2373            # Now get access to the dynamic testbeds (those added above)
2374            for tb in [ t for t in topo if t not in allocated]:
2375                self.get_access(tb, tbparams, fid, masters, tbmap, 
2376                        expid, expcert_file)
2377                allocated[tb] = 1
2378                store_keys = topo[tb].get_attribute('store_keys')
2379                # Give the testbed access to keys it exports or imports
2380                if store_keys:
2381                    auth_attrs.update(set([
2382                        (tbparams[tb].allocID, sk) \
2383                                for sk in store_keys.split(" ")]))
2384
2385            if auth_attrs:
2386                self.append_experiment_authorization(expid, auth_attrs)
2387
2388            # transit and disconnected testbeds may not have a connInfo entry.
2389            # Fill in the blanks.
2390            for t in allocated.keys():
2391                if not connInfo.has_key(t):
2392                    connInfo[t] = { }
2393
2394            self.wrangle_software(expid, top, topo, tbparams)
2395
2396            proofs = self.save_federant_information(allocated, tbparams, 
2397                    eid, top)
2398        except service_error, e:
2399            # If something goes wrong in the parse (usually an access error)
2400            # clear the placeholder state.  From here on out the code delays
2401            # exceptions.  Failing at this point returns a fault to the remote
2402            # caller.
2403
2404            self.log.info("Create experiment call failed for %s %s: %s" % 
2405                    (eid, fid, e))
2406            self.clear_placeholder(eid, expid, tmpdir)
2407            raise e
2408
2409        # Start the background swapper and return the starting state.  From
2410        # here on out, the state will stick around a while.
2411
2412        # Create a logger that logs to the experiment's state object as well as
2413        # to the main log file.
2414        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2415        alloc_collector = self.list_log(self.state[eid].log)
2416        h = logging.StreamHandler(alloc_collector)
2417        # XXX: there should be a global one of these rather than repeating the
2418        # code.
2419        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2420                    '%d %b %y %H:%M:%S'))
2421        alloc_log.addHandler(h)
2422
2423        # Start a thread to do the resource allocation
2424        t  = Thread(target=self.allocate_resources,
2425                args=(allocated, masters, eid, expid, tbparams, 
2426                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2427                    connInfo, tbmap, expcert_file),
2428                name=eid)
2429        t.start()
2430
2431        rv = {
2432                'experimentID': [
2433                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2434                ],
2435                'experimentStatus': 'starting',
2436                'proof': [ proof.to_dict() ] + proofs,
2437            }
2438        self.log.info("Create experiment call succeeded for %s %s" % \
2439                (eid, fid))
2440
2441        return rv
2442   
2443    def get_experiment_fedid(self, key):
2444        """
2445        find the fedid associated with the localname key in the state database.
2446        """
2447
2448        rv = None
2449        self.state_lock.acquire()
2450        if key in self.state:
2451            rv = self.state[key].fedid
2452        self.state_lock.release()
2453        return rv
2454
2455    def check_experiment_access(self, fid, key):
2456        """
2457        Confirm that the fid has access to the experiment.  Though a request
2458        may be made in terms of a local name, the access attribute is always
2459        the experiment's fedid.
2460        """
2461        if not isinstance(key, fedid):
2462            key = self.get_experiment_fedid(key)
2463
2464        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2465
2466        if access_ok:
2467            return proof
2468        else:
2469            raise service_error(service_error.access, "Access Denied",
2470                proof)
2471
2472
2473    def get_handler(self, path, fid):
2474        """
2475        Perhaps surprisingly named, this function handles HTTP GET requests to
2476        this server (SOAP requests are POSTs).
2477        """
2478        self.log.info("Get handler %s %s" % (path, fid))
2479        if len("%s" % fid) == 0:
2480            return (None, None)
2481        # XXX: log proofs?
2482        if self.auth.check_attribute(fid, path):
2483            return ("%s/%s" % (self.repodir, path), "application/binary")
2484        else:
2485            return (None, None)
2486
2487    def update_info(self, key, force=False):
2488        top = None
2489        self.state_lock.acquire()
2490        if key in self.state:
2491            if force or self.state[key].older_than(self.info_cache_limit):
2492                top = self.state[key].top
2493                if top is not None: top = top.clone()
2494                d1, info_params, cert, d2 = \
2495                        self.get_segment_info(self.state[key], need_lock=False)
2496        self.state_lock.release()
2497
2498        if top is None: return
2499
2500        try:
2501            tmpdir = tempfile.mkdtemp(prefix="info-")
2502        except EnvironmentError:
2503            raise service_error(service_error.internal, 
2504                    "Cannot create tmp dir")
2505        cert_file = self.make_temp_certfile(cert, tmpdir)
2506
2507        data = []
2508        try:
2509            for k, (uri, aid) in info_params.items():
2510                info=self.info_segment(log=self.log, testbed=uri,
2511                            cert_file=cert_file, cert_pwd=None,
2512                            trusted_certs=self.trusted_certs,
2513                            caller=self.call_InfoSegment)
2514                info(uri, aid)
2515                data.append(info)
2516        # Clean up the tmpdir no matter what
2517        finally:
2518            if tmpdir: self.remove_dirs(tmpdir)
2519
2520        self.annotate_topology(top, data)
2521        self.state_lock.acquire()
2522        if key in self.state:
2523            self.state[key].top = top
2524            self.state[key].updated()
2525            if self.state_filename: self.write_state()
2526        self.state_lock.release()
2527
2528   
2529    def get_info(self, req, fid):
2530        """
2531        Return all the stored info about this experiment
2532        """
2533        rv = None
2534
2535        self.log.info("Info call started for %s" %  fid)
2536        req = req.get('InfoRequestBody', None)
2537        if not req:
2538            raise service_error(service_error.req,
2539                    "Bad request format (no InfoRequestBody)")
2540        exp = req.get('experiment', None)
2541        legacy = req.get('legacy', False)
2542        fresh = req.get('fresh', False)
2543        if exp:
2544            if exp.has_key('fedid'):
2545                key = exp['fedid']
2546                keytype = "fedid"
2547            elif exp.has_key('localname'):
2548                key = exp['localname']
2549                keytype = "localname"
2550            else:
2551                raise service_error(service_error.req, "Unknown lookup type")
2552        else:
2553            raise service_error(service_error.req, "No request?")
2554
2555        try:
2556            proof = self.check_experiment_access(fid, key)
2557        except service_error, e:
2558            self.log.info("Info call failed for %s: access denied" %  fid)
2559
2560
2561        self.update_info(key, fresh)
2562
2563        self.state_lock.acquire()
2564        if self.state.has_key(key):
2565            rv = self.state[key].get_info()
2566            # Copy the topo if we need legacy annotations
2567            if legacy:
2568                top = self.state[key].top
2569                if top is not None: top = top.clone()
2570        self.state_lock.release()
2571        self.log.info("Gathered Info for %s %s" % (key, fid))
2572
2573        # If the legacy visualization and topology representations are
2574        # requested, calculate them and add them to the return.
2575        if legacy and rv is not None:
2576            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2577            if top is not None:
2578                vtopo = topdl.topology_to_vtopo(top)
2579                if vtopo is not None:
2580                    rv['vtopo'] = vtopo
2581                    try:
2582                        vis = self.genviz(vtopo)
2583                    except service_error, e:
2584                        self.log.debug('Problem generating visualization: %s' \
2585                                % e)
2586                        vis = None
2587                    if vis is not None:
2588                        rv['vis'] = vis
2589        if rv:
2590            self.log.info("Info succeded for %s %s" % (key, fid))
2591            rv['proof'] = proof.to_dict()
2592            return rv
2593        else: 
2594            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2595            raise service_error(service_error.req, "No such experiment")
2596
2597    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2598            results):
2599        """
2600        Call OperateSegment on multiple testbeds and gather the results.
2601        op_params contains the parameters needed to contact that testbed, cert
2602        is a certificate containing the fedid to use, op is the operation,
2603        testbeds is a dict mapping testbed name to targets in that testbed,
2604        params are the parameters to include a,d results is a growing list of
2605        the results of the calls.
2606        """
2607        try:
2608            tmpdir = tempfile.mkdtemp(prefix="info-")
2609        except EnvironmentError:
2610            raise service_error(service_error.internal, 
2611                    "Cannot create tmp dir")
2612        cert_file = self.make_temp_certfile(cert, tmpdir)
2613
2614        try:
2615            for tb, targets in testbeds.items():
2616                if tb in op_params:
2617                    uri, aid = op_params[tb]
2618                    operate=self.operation_segment(log=self.log, testbed=uri,
2619                                cert_file=cert_file, cert_pwd=None,
2620                                trusted_certs=self.trusted_certs,
2621                                caller=self.call_OperationSegment)
2622                    if operate(uri, aid, op, targets, params):
2623                        if operate.status is not None:
2624                            results.extend(operate.status)
2625                            continue
2626                # Something went wrong in a weird way.  Add statuses
2627                # that reflect that to results
2628                for t in targets:
2629                    results.append(operation_status(t, 
2630                        operation_status.federant,
2631                        'Unexpected error on %s' % tb))
2632        # Clean up the tmpdir no matter what
2633        finally:
2634            if tmpdir: self.remove_dirs(tmpdir)
2635
2636    def do_operation(self, req, fid):
2637        """
2638        Find the testbeds holding each target and ask them to carry out the
2639        operation.  Return the statuses.
2640        """
2641        # Map an element to the testbed containing it
2642        def element_to_tb(e):
2643            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2644            elif isinstance(e, topdl.Testbed): return e.name
2645            else: return None
2646        # If d is an operation_status object, make it a dict
2647        def make_dict(d):
2648            if isinstance(d, dict): return d
2649            elif isinstance(d, operation_status): return d.to_dict()
2650            else: return { }
2651
2652        def element_name(e):
2653            if isinstance(e, topdl.Computer): return e.name
2654            elif isinstance(e, topdl.Testbed): 
2655                if e.localname: return e.localname[0]
2656                else: return None
2657            else: return None
2658
2659        self.log.info("Operation call started for %s" %  fid)
2660        req = req.get('OperationRequestBody', None)
2661        if not req:
2662            raise service_error(service_error.req,
2663                    "Bad request format (no OperationRequestBody)")
2664        exp = req.get('experiment', None)
2665        op = req.get('operation', None)
2666        targets = set(req.get('target', []))
2667        params = req.get('parameter', None)
2668
2669        if exp:
2670            if 'fedid' in exp:
2671                key = exp['fedid']
2672                keytype = "fedid"
2673            elif 'localname' in exp:
2674                key = exp['localname']
2675                keytype = "localname"
2676            else:
2677                raise service_error(service_error.req, "Unknown lookup type")
2678        else:
2679            raise service_error(service_error.req, "No request?")
2680
2681        if op is None or not targets:
2682            raise service_error(service_error.req, "No request?")
2683
2684        try:
2685            proof = self.check_experiment_access(fid, key)
2686        except service_error, e:
2687            self.log.info("Operation call failed for %s: access denied" %  fid)
2688            raise e
2689
2690        self.state_lock.acquire()
2691        if key in self.state:
2692            d1, op_params, cert, d2 = \
2693                    self.get_segment_info(self.state[key], need_lock=False,
2694                            key='tb')
2695            top = self.state[key].top
2696            if top is not None:
2697                top = top.clone()
2698        self.state_lock.release()
2699
2700        if top is None:
2701            self.log.info("Operation call failed for %s: not active" %  fid)
2702            raise service_error(service_error.partial, "No topology yet", 
2703                    proof=proof)
2704
2705        testbeds = { }
2706        results = []
2707        for e in top.elements:
2708            ename = element_name(e)
2709            if ename in targets:
2710                tb = element_to_tb(e)
2711                targets.remove(ename)
2712                if tb is not None:
2713                    if tb in testbeds: testbeds[tb].append(ename)
2714                    else: testbeds[tb] = [ ename ]
2715                else:
2716                    results.append(operation_status(e.name, 
2717                        code=operation_status.no_target, 
2718                        description='Cannot map target to testbed'))
2719
2720        for t in targets:
2721            results.append(operation_status(t, operation_status.no_target))
2722
2723        self.operate_on_segments(op_params, cert, op, testbeds, params,
2724                results)
2725
2726        self.log.info("Operation call succeeded for %s" %  fid)
2727        return { 
2728                'experiment': exp, 
2729                'status': [make_dict(r) for r in results],
2730                'proof': proof.to_dict()
2731                }
2732
2733
2734    def get_multi_info(self, req, fid):
2735        """
2736        Return all the stored info that this fedid can access
2737        """
2738        rv = { 'info': [ ], 'proof': [ ] }
2739
2740        self.log.info("Multi Info call started for %s" %  fid)
2741        self.get_grouper_updates(fid)
2742        self.auth.update()
2743        self.auth.save()
2744        self.state_lock.acquire()
2745        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2746            try:
2747                proof = self.check_experiment_access(fid, key)
2748            except service_error, e:
2749                if e.code == service_error.access:
2750                    continue
2751                else:
2752                    self.log.info("Multi Info call failed for %s: %s" %  \
2753                            (e,fid))
2754                    self.state_lock.release()
2755                    raise e
2756
2757            if self.state.has_key(key):
2758                e = self.state[key].get_info()
2759                e['proof'] = proof.to_dict()
2760                rv['info'].append(e)
2761                rv['proof'].append(proof.to_dict())
2762        self.state_lock.release()
2763        self.log.info("Multi Info call succeeded for %s" %  fid)
2764        return rv
2765
2766    def check_termination_status(self, fed_exp, force):
2767        """
2768        Confirm that the experiment is sin a valid state to stop (or force it)
2769        return the state - invalid states for deletion and force settings cause
2770        exceptions.
2771        """
2772        self.state_lock.acquire()
2773        status = fed_exp.status
2774
2775        if status:
2776            if status in ('starting', 'terminating'):
2777                if not force:
2778                    self.state_lock.release()
2779                    raise service_error(service_error.partial, 
2780                            'Experiment still being created or destroyed')
2781                else:
2782                    self.log.warning('Experiment in %s state ' % status + \
2783                            'being terminated by force.')
2784            self.state_lock.release()
2785            return status
2786        else:
2787            # No status??? trouble
2788            self.state_lock.release()
2789            raise service_error(service_error.internal,
2790                    "Experiment has no status!?")
2791
2792    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2793        ids = []
2794        term_params = { }
2795        if need_lock: self.state_lock.acquire()
2796        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2797        expcert = fed_exp.identity
2798        repo = "%s" % fed_exp.fedid
2799
2800        # Collect the allocation/segment ids into a dict keyed by the fedid
2801        # of the allocation that contains a tuple of uri, aid
2802        for i, fed in enumerate(fed_exp.get_all_allocations()):
2803            uri = fed.uri
2804            aid = fed.allocID
2805            if key == 'aid': term_params[aid] = (uri, aid)
2806            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2807
2808        if need_lock: self.state_lock.release()
2809        return ids, term_params, expcert, repo
2810
2811
2812    def get_termination_info(self, fed_exp):
2813        self.state_lock.acquire()
2814        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2815        # Change the experiment state
2816        fed_exp.status = 'terminating'
2817        fed_exp.updated()
2818        if self.state_filename: self.write_state()
2819        self.state_lock.release()
2820
2821        return ids, term_params, expcert, repo
2822
2823
2824    def deallocate_resources(self, term_params, expcert, status, force, 
2825            dealloc_log):
2826        tmpdir = None
2827        # This try block makes sure the tempdir is cleared
2828        try:
2829            # If no expcert, try the deallocation as the experiment
2830            # controller instance.
2831            if expcert and self.auth_type != 'legacy': 
2832                try:
2833                    tmpdir = tempfile.mkdtemp(prefix="term-")
2834                except EnvironmentError:
2835                    raise service_error(service_error.internal, 
2836                            "Cannot create tmp dir")
2837                cert_file = self.make_temp_certfile(expcert, tmpdir)
2838                pw = None
2839            else: 
2840                cert_file = self.cert_file
2841                pw = self.cert_pwd
2842
2843            # Stop everyone.  NB, wait_for_all waits until a thread starts
2844            # and then completes, so we can't wait if nothing starts.  So,
2845            # no tbparams, no start.
2846            if len(term_params) > 0:
2847                tp = thread_pool(self.nthreads)
2848                for k, (uri, aid) in term_params.items():
2849                    # Create and start a thread to stop the segment
2850                    tp.wait_for_slot()
2851                    t  = pooled_thread(\
2852                            target=self.terminate_segment(log=dealloc_log,
2853                                testbed=uri,
2854                                cert_file=cert_file, 
2855                                cert_pwd=pw,
2856                                trusted_certs=self.trusted_certs,
2857                                caller=self.call_TerminateSegment),
2858                            args=(uri, aid), name=k,
2859                            pdata=tp, trace_file=self.trace_file)
2860                    t.start()
2861                # Wait for completions
2862                tp.wait_for_all_done()
2863
2864            # release the allocations (failed experiments have done this
2865            # already, and starting experiments may be in odd states, so we
2866            # ignore errors releasing those allocations
2867            try: 
2868                for k, (uri, aid)  in term_params.items():
2869                    self.release_access(None, aid, uri=uri,
2870                            cert_file=cert_file, cert_pwd=pw)
2871            except service_error, e:
2872                if status != 'failed' and not force:
2873                    raise e
2874
2875        # Clean up the tmpdir no matter what
2876        finally:
2877            if tmpdir: self.remove_dirs(tmpdir)
2878
2879    def terminate_experiment(self, req, fid):
2880        """
2881        Swap this experiment out on the federants and delete the shared
2882        information
2883        """
2884        self.log.info("Terminate experiment call started for %s" % fid)
2885        tbparams = { }
2886        req = req.get('TerminateRequestBody', None)
2887        if not req:
2888            raise service_error(service_error.req,
2889                    "Bad request format (no TerminateRequestBody)")
2890
2891        key = self.get_experiment_key(req, 'experiment')
2892        try:
2893            proof = self.check_experiment_access(fid, key)
2894        except service_error, e:
2895            self.log.info(
2896                    "Terminate experiment call failed for %s: access denied" \
2897                            % fid)
2898            raise e
2899        exp = req.get('experiment', False)
2900        force = req.get('force', False)
2901
2902        dealloc_list = [ ]
2903
2904
2905        # Create a logger that logs to the dealloc_list as well as to the main
2906        # log file.
2907        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2908        dealloc_log.info("Terminating %s " %key)
2909        h = logging.StreamHandler(self.list_log(dealloc_list))
2910        # XXX: there should be a global one of these rather than repeating the
2911        # code.
2912        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2913                    '%d %b %y %H:%M:%S'))
2914        dealloc_log.addHandler(h)
2915
2916        self.state_lock.acquire()
2917        fed_exp = self.state.get(key, None)
2918        self.state_lock.release()
2919        repo = None
2920
2921        if fed_exp:
2922            status = self.check_termination_status(fed_exp, force)
2923            # get_termination_info updates the experiment state
2924            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2925            self.deallocate_resources(term_params, expcert, status, force, 
2926                    dealloc_log)
2927
2928            # Remove the terminated experiment
2929            self.state_lock.acquire()
2930            for id in ids:
2931                self.clear_experiment_authorization(id, need_state_lock=False)
2932                if id in self.state: del self.state[id]
2933
2934            if self.state_filename: self.write_state()
2935            self.state_lock.release()
2936
2937            # Delete any synch points associated with this experiment.  All
2938            # synch points begin with the fedid of the experiment.
2939            fedid_keys = set(["fedid:%s" % f for f in ids \
2940                    if isinstance(f, fedid)])
2941            for k in self.synch_store.all_keys():
2942                try:
2943                    if len(k) > 45 and k[0:46] in fedid_keys:
2944                        self.synch_store.del_value(k)
2945                except synch_store.BadDeletionError:
2946                    pass
2947            self.write_store()
2948
2949            # Remove software and other cached stuff from the filesystem.
2950            if repo:
2951                self.remove_dirs("%s/%s" % (self.repodir, repo))
2952       
2953            self.log.info("Terminate experiment succeeded for %s %s" % \
2954                    (key, fid))
2955            return { 
2956                    'experiment': exp , 
2957                    'deallocationLog': string.join(dealloc_list, ''),
2958                    'proof': [proof.to_dict()],
2959                    }
2960        else:
2961            self.log.info("Terminate experiment failed for %s %s: no state" % \
2962                    (key, fid))
2963            raise service_error(service_error.req, "No saved state")
2964
2965
2966    def GetValue(self, req, fid):
2967        """
2968        Get a value from the synchronized store
2969        """
2970        req = req.get('GetValueRequestBody', None)
2971        if not req:
2972            raise service_error(service_error.req,
2973                    "Bad request format (no GetValueRequestBody)")
2974       
2975        name = req.get('name', None)
2976        wait = req.get('wait', False)
2977        rv = { 'name': name }
2978
2979        if not name:
2980            raise service_error(service_error.req, "No name?")
2981
2982        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2983
2984        if access_ok:
2985            self.log.debug("[GetValue] asking for %s " % name)
2986            try:
2987                v = self.synch_store.get_value(name, wait)
2988            except synch_store.RevokedKeyError:
2989                # No more synch on this key
2990                raise service_error(service_error.federant, 
2991                        "Synch key %s revoked" % name)
2992            if v is not None:
2993                rv['value'] = v
2994            rv['proof'] = proof.to_dict()
2995            self.log.debug("[GetValue] got %s from %s" % (v, name))
2996            return rv
2997        else:
2998            raise service_error(service_error.access, "Access Denied",
2999                    proof=proof)
3000       
3001
3002    def SetValue(self, req, fid):
3003        """
3004        Set a value in the synchronized store
3005        """
3006        req = req.get('SetValueRequestBody', None)
3007        if not req:
3008            raise service_error(service_error.req,
3009                    "Bad request format (no SetValueRequestBody)")
3010       
3011        name = req.get('name', None)
3012        v = req.get('value', '')
3013
3014        if not name:
3015            raise service_error(service_error.req, "No name?")
3016
3017        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
3018
3019        if access_ok:
3020            try:
3021                self.synch_store.set_value(name, v)
3022                self.write_store()
3023                self.log.debug("[SetValue] set %s to %s" % (name, v))
3024            except synch_store.CollisionError:
3025                # Translate into a service_error
3026                raise service_error(service_error.req,
3027                        "Value already set: %s" %name)
3028            except synch_store.RevokedKeyError:
3029                # No more synch on this key
3030                raise service_error(service_error.federant, 
3031                        "Synch key %s revoked" % name)
3032                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
3033        else:
3034            raise service_error(service_error.access, "Access Denied",
3035                    proof=proof)
Note: See TracBrowser for help on using the repository browser.