source: fedd/federation/experiment_control.py @ e8f2d4c

Last change on this file since e8f2d4c was 43c0e75, checked in by Ted Faber <faber@…>, 12 years ago

Debugging

  • Property mode set to 100644
File size: 99.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15import shutil
16import zipfile
17
18import os.path
19
20import traceback
21# For parsing visualization output and splitter output
22import xml.parsers.expat
23
24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
26from string import join
27
28from urlparse import urlparse
29from urllib2 import urlopen, URLError
30
31from util import *
32from deter import fedid, generate_fedid
33from remote_service import xmlrpc_handler, soap_handler, service_caller
34from service_error import service_error
35from synch_store import synch_store
36from experiment_partition import experiment_partition
37from experiment_control_legacy import experiment_control_legacy
38from authorizer import abac_authorizer
39from thread_pool import thread_pool, pooled_thread
40from experiment_info import experiment_info, allocation_info, federated_service
41from operation_status import operation_status
42
43from deter import topdl
44from deter import ip_allocator
45from deter import ip_addr
46from deter import topology_to_route_file
47import list_log
48
49
50class nullHandler(logging.Handler):
51    def emit(self, record): pass
52
53fl = logging.getLogger("fedd.experiment_control")
54fl.addHandler(nullHandler())
55
56class experiment_control_local(experiment_control_legacy):
57    """
58    Control of experiments that this system can directly access.
59
60    Includes experiment creation, termination and information dissemination.
61    Thred safe.
62    """
63
64    class ssh_cmd_timeout(RuntimeError): pass
65   
66    call_RequestAccess = service_caller('RequestAccess')
67    call_ReleaseAccess = service_caller('ReleaseAccess')
68    call_StartSegment = service_caller('StartSegment')
69    call_TerminateSegment = service_caller('TerminateSegment')
70    call_InfoSegment = service_caller('InfoSegment')
71    call_OperationSegment = service_caller('OperationSegment')
72    call_Ns2Topdl = service_caller('Ns2Topdl')
73
74    def __init__(self, config=None, auth=None):
75        """
76        Intialize the various attributes, most from the config object
77        """
78
79        def parse_tarfile_list(tf):
80            """
81            Parse a tarfile list from the configuration.  This is a set of
82            paths and tarfiles separated by spaces.
83            """
84            rv = [ ]
85            if tf is not None:
86                tl = tf.split()
87                while len(tl) > 1:
88                    p, t = tl[0:2]
89                    del tl[0:2]
90                    rv.append((p, t))
91            return rv
92
93        self.list_log = list_log.list_log
94
95        self.cert_file = config.get("experiment_control", "cert_file")
96        if self.cert_file:
97            self.cert_pwd = config.get("experiment_control", "cert_pwd")
98        else:
99            self.cert_file = config.get("globals", "cert_file")
100            self.cert_pwd = config.get("globals", "cert_pwd")
101
102        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
103                or config.get("globals", "trusted_certs")
104
105        self.repodir = config.get("experiment_control", "repodir")
106        self.repo_url = config.get("experiment_control", "repo_url", 
107                "https://users.isi.deterlab.net:23235");
108
109        self.exp_stem = "fed-stem"
110        self.log = logging.getLogger("fedd.experiment_control")
111        set_log_level(config, "experiment_control", self.log)
112        self.muxmax = 2
113        self.nthreads = 10
114        self.randomize_experiments = False
115
116        self.splitter = None
117        self.ssh_keygen = "/usr/bin/ssh-keygen"
118        self.ssh_identity_file = None
119
120
121        self.debug = config.getboolean("experiment_control", "create_debug")
122        self.cleanup = not config.getboolean("experiment_control", 
123                "leave_tmpfiles")
124        self.state_filename = config.get("experiment_control", 
125                "experiment_state")
126        self.store_filename = config.get("experiment_control", 
127                "synch_store")
128        self.store_url = config.get("experiment_control", "store_url")
129        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
130        self.grouper_url = config.get("experiment_control", "grouper_url")
131        self.fedkit = parse_tarfile_list(\
132                config.get("experiment_control", "fedkit"))
133        self.gatewaykit = parse_tarfile_list(\
134                config.get("experiment_control", "gatewaykit"))
135
136        dt = config.get("experiment_control", "direct_transit")
137        self.auth_type = config.get('experiment_control', 'auth_type') \
138                or 'legacy'
139        self.auth_dir = config.get('experiment_control', 'auth_dir')
140        self.routing = config.get('experiment_control', 'routing')
141        self.default_tb = config.get('experiment_control', 'default_testbed')
142        # XXX: document this!
143        self.info_cache_limit = \
144                config.getint('experiment_control', 'info_cache', 600)
145        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
146        else: self.direct_transit = [ ]
147        # NB for internal master/slave ops, not experiment setup
148        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
149
150        self.overrides = set([])
151        ovr = config.get('experiment_control', 'overrides')
152        if ovr:
153            for o in ovr.split(","):
154                o = o.strip()
155                if o.startswith('fedid:'): o = o[len('fedid:'):]
156                self.overrides.add(fedid(hexstr=o))
157
158        self.state = { }
159        self.state_lock = Lock()
160        self.tclsh = "/usr/local/bin/otclsh"
161        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
162                config.get("experiment_control", "tcl_splitter",
163                        "/usr/testbed/lib/ns2ir/parse.tcl")
164        mapdb_file = config.get("experiment_control", "mapdb")
165        self.trace_file = sys.stderr
166
167        self.def_expstart = \
168                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
169                "/tmp/federate";
170        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
171                "FEDDIR/hosts";
172        self.def_gwstart = \
173                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
174                "/tmp/bridge.log";
175        self.def_mgwstart = \
176                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
177                "/tmp/bridge.log";
178        self.def_gwimage = "FBSD61-TUNNEL2";
179        self.def_gwtype = "pc";
180        self.local_access = { }
181
182        if self.auth_type == 'legacy':
183            if auth:
184                self.auth = auth
185            else:
186                self.log.error( "[access]: No authorizer initialized, " +\
187                        "creating local one.")
188                auth = authorizer()
189            self.get_access = self.legacy_get_access
190        elif self.auth_type == 'abac':
191            self.auth = abac_authorizer(load=self.auth_dir, 
192                    update=os.path.join(self.auth_dir,'update'))
193        else:
194            raise service_error(service_error.internal, 
195                    "Unknown auth_type: %s" % self.auth_type)
196
197        if mapdb_file:
198            self.read_mapdb(mapdb_file)
199        else:
200            self.log.warn("[experiment_control] No testbed map, using defaults")
201            self.tbmap = { 
202                    'deter':'https://users.isi.deterlab.net:23235',
203                    'emulab':'https://users.isi.deterlab.net:23236',
204                    'ucb':'https://users.isi.deterlab.net:23237',
205                    }
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
219        # Dispatch tables
220        self.soap_services = {\
221                'New': soap_handler('New', self.new_experiment),
222                'Create': soap_handler('Create', self.create_experiment),
223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
227                'Operation': soap_handler('Operation', self.do_operation),
228                'Terminate': soap_handler('Terminate', 
229                    self.terminate_experiment),
230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
232        }
233
234        self.xmlrpc_services = {\
235                'New': xmlrpc_handler('New', self.new_experiment),
236                'Create': xmlrpc_handler('Create', self.create_experiment),
237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
241                'Terminate': xmlrpc_handler('Terminate',
242                    self.terminate_experiment),
243                'Operation': xmlrpc_handler('Operation', self.do_operation),
244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
246        }
247
248    # Call while holding self.state_lock
249    def write_state(self):
250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
262        except EnvironmentError, e:
263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
269
270    @staticmethod
271    def get_alloc_ids(exp):
272        """
273        Used by read_store and read state.  This used to be worse.
274        """
275
276        return [ a.allocID for a in exp.get_all_allocations() ]
277
278
279    # Call while holding self.state_lock
280    def read_state(self):
281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
286       
287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
292        except EnvironmentError, e:
293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
299        for s in self.state.values():
300            try:
301
302                eid = s.fedid
303                if eid : 
304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
307                        #self.auth.set_attribute(s['owner'], eid)
308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
317                else:
318                    raise KeyError("No experiment id")
319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
322
323    def read_mapdb(self, file):
324        """
325        Read a simple colon separated list of mappings for the
326        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
327        also adds testbeds to active if they include , active after
328        their name.
329        """
330
331        self.tbmap = { }
332        lineno =0
333        try:
334            f = open(file, "r")
335            for line in f:
336                lineno += 1
337                line = line.strip()
338                if line.startswith('#') or len(line) == 0:
339                    continue
340                try:
341                    label, url = line.split(':', 1)
342                    self.tbmap[label] = url
343                except ValueError, e:
344                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
345                            "map db: %s %s" % (lineno, line, e))
346        except EnvironmentError, e:
347            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
348                    "open %s: %s" % (file, e))
349        else:
350            f.close()
351
352    def read_store(self):
353        try:
354            self.synch_store = synch_store()
355            self.synch_store.load(self.store_filename)
356            self.log.debug("[read_store]: Read store from %s" % \
357                    self.store_filename)
358        except EnvironmentError, e:
359            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
360                    % (self.state_filename, e))
361            self.synch_store = synch_store()
362
363        # Set the initial permissions on data in the store.  XXX: This ad hoc
364        # authorization attribute initialization is getting out of hand.
365        # XXX: legacy
366        if self.auth_type == 'legacy':
367            for k in self.synch_store.all_keys():
368                try:
369                    if k.startswith('fedid:'):
370                        fid = fedid(hexstr=k[6:46])
371                        if self.state.has_key(fid):
372                            for a in self.get_alloc_ids(self.state[fid]):
373                                self.auth.set_attribute(a, k)
374                except ValueError, e:
375                    self.log.warn("Cannot deduce permissions for %s" % k)
376
377
378    def write_store(self):
379        """
380        Write a new copy of synch_store after writing current state
381        to a backup.  We use the internal synch_store pickle method to avoid
382        incinsistent data.
383
384        State format is a simple pickling of the store.
385        """
386        if os.access(self.store_filename, os.W_OK):
387            copy_file(self.store_filename, \
388                    "%s.bak" % self.store_filename)
389        try:
390            self.synch_store.save(self.store_filename)
391        except EnvironmentError, e:
392            self.log.error("Can't write file %s: %s" % \
393                    (self.store_filename, e))
394        except TypeError, e:
395            self.log.error("Pickling problem (TypeError): %s" % e)
396
397    # XXX this may belong somewhere else
398
399    def get_grouper_updates(self, fid):
400        if self.grouper_url is None: return
401        d = tempfile.mkdtemp()
402        try:
403            fstr = "%s" % fid
404            # XXX locking
405            zipname = os.path.join(d, 'grouper.zip')
406            dest = os.path.join(self.auth_dir, 'update')
407            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
408            f = open(zipname, 'w')
409            f.write(resp.read())
410            f.close()
411            zf = zipfile.ZipFile(zipname, 'r')
412            zf.extractall(dest)
413            zf.close()
414        except URLError, e:
415            self.log.error("Cannot connect to grouper: %s" % e)
416            pass
417        finally:
418            shutil.rmtree(d)
419
420
421    def remove_dirs(self, dir):
422        """
423        Remove the directory tree and all files rooted at dir.  Log any errors,
424        but continue.
425        """
426        self.log.debug("[removedirs]: removing %s" % dir)
427        try:
428            for path, dirs, files in os.walk(dir, topdown=False):
429                for f in files:
430                    os.remove(os.path.join(path, f))
431                for d in dirs:
432                    os.rmdir(os.path.join(path, d))
433            os.rmdir(dir)
434        except EnvironmentError, e:
435            self.log.error("Error deleting directory tree in %s" % e);
436
437    @staticmethod
438    def make_temp_certfile(expcert, tmpdir):
439        """
440        make a protected copy of the access certificate so the experiment
441        controller can act as the experiment principal.  mkstemp is the most
442        secure way to do that. The directory should be created by
443        mkdtemp.  Return the filename.
444        """
445        if expcert and tmpdir:
446            try:
447                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
448                f = os.fdopen(certf, 'w')
449                print >> f, expcert
450                f.close()
451            except EnvironmentError, e:
452                raise service_error(service_error.internal, 
453                        "Cannot create temp cert file?")
454            return certfn
455        else:
456            return None
457
458       
459    def generate_ssh_keys(self, dest, type="rsa" ):
460        """
461        Generate a set of keys for the gateways to use to talk.
462
463        Keys are of type type and are stored in the required dest file.
464        """
465        valid_types = ("rsa", "dsa")
466        t = type.lower();
467        if t not in valid_types: raise ValueError
468        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
469
470        try:
471            trace = open("/dev/null", "w")
472        except EnvironmentError:
473            raise service_error(service_error.internal,
474                    "Cannot open /dev/null??");
475
476        # May raise CalledProcessError
477        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
478        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
479        if rv != 0:
480            raise service_error(service_error.internal, 
481                    "Cannot generate nonce ssh keys.  %s return code %d" \
482                            % (self.ssh_keygen, rv))
483
484    def generate_seer_certs(self, destdir):
485        '''
486        Create a SEER ca cert and a node cert in destdir/ca.pem and
487        destdir/node.pem respectively.  These will be distributed throughout
488        the federated experiment.  This routine reports errors via
489        service_errors.
490        '''
491        openssl = '/usr/bin/openssl'
492        # All the filenames and parameters we need for openssl calls below
493        ca_key =os.path.join(destdir, 'ca.key') 
494        ca_pem = os.path.join(destdir, 'ca.pem')
495        node_key =os.path.join(destdir, 'node.key') 
496        node_pem = os.path.join(destdir, 'node.pem')
497        node_req = os.path.join(destdir, 'node.req')
498        node_signed = os.path.join(destdir, 'node.signed')
499        days = '%s' % (365 * 10)
500        serial = '%s' % random.randint(0, 1<<16)
501
502        try:
503            # Sequence of calls to create a CA key, create a ca cert, create a
504            # node key, node signing request, and finally a signed node
505            # certificate.
506            sequence = (
507                    (openssl, 'genrsa', '-out', ca_key, '1024'),
508                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
509                        ca_pem, '-days', days, '-subj', 
510                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
511                    (openssl, 'genrsa', '-out', node_key, '1024'),
512                    (openssl, 'req', '-new', '-key', node_key, '-out', 
513                        node_req, '-days', days, '-subj', 
514                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
515                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
516                        '-set_serial', serial, '-req', '-in', node_req, 
517                        '-out', node_signed, '-days', days),
518                )
519            # Do all that stuff; bail if there's an error, and push all the
520            # output to dev/null.
521            for cmd in sequence:
522                trace = open("/dev/null", "w")
523                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
524                if rv != 0:
525                    raise service_error(service_error.internal, 
526                            "Cannot generate SEER certs.  %s return code %d" \
527                                    % (' '.join(cmd), rv))
528            # Concatinate the node key and signed certificate into node.pem
529            f = open(node_pem, 'w')
530            for comp in (node_signed, node_key):
531                g = open(comp, 'r')
532                f.write(g.read())
533                g.close()
534            f.close()
535
536            # Throw out intermediaries.
537            for fn in (ca_key, node_key, node_req, node_signed):
538                os.unlink(fn)
539
540        except EnvironmentError, e:
541            # Any difficulties with the file system wind up here
542            raise service_error(service_error.internal,
543                    "File error on  %s while creating SEER certs: %s" % \
544                            (e.filename, e.strerror))
545
546
547
548    def gentopo(self, str):
549        """
550        Generate the topology data structure from the splitter's XML
551        representation of it.
552
553        The topology XML looks like:
554            <experiment>
555                <nodes>
556                    <node><vname></vname><ips>ip1:ip2</ips></node>
557                </nodes>
558                <lans>
559                    <lan>
560                        <vname></vname><vnode></vnode><ip></ip>
561                        <bandwidth></bandwidth><member>node:port</member>
562                    </lan>
563                </lans>
564        """
565        class topo_parse:
566            """
567            Parse the topology XML and create the dats structure.
568            """
569            def __init__(self):
570                # Typing of the subelements for data conversion
571                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
572                self.int_subelements = ( 'bandwidth',)
573                self.float_subelements = ( 'delay',)
574                # The final data structure
575                self.nodes = [ ]
576                self.lans =  [ ]
577                self.topo = { \
578                        'node': self.nodes,\
579                        'lan' : self.lans,\
580                    }
581                self.element = { }  # Current element being created
582                self.chars = ""     # Last text seen
583
584            def end_element(self, name):
585                # After each sub element the contents is added to the current
586                # element or to the appropriate list.
587                if name == 'node':
588                    self.nodes.append(self.element)
589                    self.element = { }
590                elif name == 'lan':
591                    self.lans.append(self.element)
592                    self.element = { }
593                elif name in self.str_subelements:
594                    self.element[name] = self.chars
595                    self.chars = ""
596                elif name in self.int_subelements:
597                    self.element[name] = int(self.chars)
598                    self.chars = ""
599                elif name in self.float_subelements:
600                    self.element[name] = float(self.chars)
601                    self.chars = ""
602
603            def found_chars(self, data):
604                self.chars += data.rstrip()
605
606
607        tp = topo_parse();
608        parser = xml.parsers.expat.ParserCreate()
609        parser.EndElementHandler = tp.end_element
610        parser.CharacterDataHandler = tp.found_chars
611
612        parser.Parse(str)
613
614        return tp.topo
615       
616
617    def genviz(self, topo):
618        """
619        Generate the visualization the virtual topology
620        """
621
622        neato = "/usr/local/bin/neato"
623        # These are used to parse neato output and to create the visualization
624        # file.
625        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
626        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
627                "%s</type></node>"
628
629        try:
630            # Node names
631            nodes = [ n['vname'] for n in topo['node'] ]
632            topo_lans = topo['lan']
633        except KeyError, e:
634            raise service_error(service_error.internal, "Bad topology: %s" %e)
635
636        lans = { }
637        links = { }
638
639        # Walk through the virtual topology, organizing the connections into
640        # 2-node connections (links) and more-than-2-node connections (lans).
641        # When a lan is created, it's added to the list of nodes (there's a
642        # node in the visualization for the lan).
643        for l in topo_lans:
644            if links.has_key(l['vname']):
645                if len(links[l['vname']]) < 2:
646                    links[l['vname']].append(l['vnode'])
647                else:
648                    nodes.append(l['vname'])
649                    lans[l['vname']] = links[l['vname']]
650                    del links[l['vname']]
651                    lans[l['vname']].append(l['vnode'])
652            elif lans.has_key(l['vname']):
653                lans[l['vname']].append(l['vnode'])
654            else:
655                links[l['vname']] = [ l['vnode'] ]
656
657
658        # Open up a temporary file for dot to turn into a visualization
659        try:
660            df, dotname = tempfile.mkstemp()
661            dotfile = os.fdopen(df, 'w')
662        except EnvironmentError:
663            raise service_error(service_error.internal,
664                    "Failed to open file in genviz")
665
666        try:
667            dnull = open('/dev/null', 'w')
668        except EnvironmentError:
669            service_error(service_error.internal,
670                    "Failed to open /dev/null in genviz")
671
672        # Generate a dot/neato input file from the links, nodes and lans
673        try:
674            print >>dotfile, "graph G {"
675            for n in nodes:
676                print >>dotfile, '\t"%s"' % n
677            for l in links.keys():
678                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
679            for l in lans.keys():
680                for n in lans[l]:
681                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
682            print >>dotfile, "}"
683            dotfile.close()
684        except TypeError:
685            raise service_error(service_error.internal,
686                    "Single endpoint link in vtopo")
687        except EnvironmentError:
688            raise service_error(service_error.internal, "Cannot write dot file")
689
690        # Use dot to create a visualization
691        try:
692            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
693                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
694                stderr=dnull, close_fds=True)
695        except EnvironmentError:
696            raise service_error(service_error.internal, 
697                    "Cannot generate visualization: is graphviz available?")
698        dnull.close()
699
700        # Translate dot to vis format
701        vis_nodes = [ ]
702        vis = { 'node': vis_nodes }
703        for line in dot.stdout:
704            m = vis_re.match(line)
705            if m:
706                vn = m.group(1)
707                vis_node = {'name': vn, \
708                        'x': float(m.group(2)),\
709                        'y' : float(m.group(3)),\
710                    }
711                if vn in links.keys() or vn in lans.keys():
712                    vis_node['type'] = 'lan'
713                else:
714                    vis_node['type'] = 'node'
715                vis_nodes.append(vis_node)
716        rv = dot.wait()
717
718        os.remove(dotname)
719        # XXX: graphviz seems to use low return codes for warnings, like
720        # "couldn't find font"
721        if rv < 2 : return vis
722        else: return None
723
724
725    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
726            cert_pwd=None):
727        """
728        Release access to testbed through fedd
729        """
730
731        if not uri and tbmap:
732            uri = tbmap.get(tb, None)
733        if not uri:
734            raise service_error(service_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        if self.local_access.has_key(uri):
738            resp = self.local_access[uri].ReleaseAccess(\
739                    { 'ReleaseAccessRequestBody' : 
740                        {'allocID': {'fedid': aid}},}, 
741                    fedid(file=cert_file))
742            resp = { 'ReleaseAccessResponseBody': resp } 
743        else:
744            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
745                    cert_file, cert_pwd, self.trusted_certs)
746
747        # better error coding
748
749    def remote_ns2topdl(self, uri, desc):
750
751        req = {
752                'description' : { 'ns2description': desc },
753            }
754
755        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
756                self.trusted_certs)
757
758        if r.has_key('Ns2TopdlResponseBody'):
759            r = r['Ns2TopdlResponseBody']
760            ed = r.get('experimentdescription', None)
761            if ed.has_key('topdldescription'):
762                return topdl.Topology(**ed['topdldescription'])
763            else:
764                raise service_error(service_error.protocol, 
765                        "Bad splitter response (no output)")
766        else:
767            raise service_error(service_error.protocol, "Bad splitter response")
768
769    class start_segment:
770        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
771                cert_pwd=None, trusted_certs=None, caller=None,
772                log_collector=None):
773            self.log = log
774            self.debug = debug
775            self.cert_file = cert_file
776            self.cert_pwd = cert_pwd
777            self.trusted_certs = None
778            self.caller = caller
779            self.testbed = testbed
780            self.log_collector = log_collector
781            self.response = None
782            self.node = { }
783            self.subs = { }
784            self.tb = { }
785            self.proof = None
786
787        def make_map(self, resp):
788            if 'segmentdescription' not in resp  or \
789                    'topdldescription' not in resp['segmentdescription']:
790                self.log.warn('No topology returned from startsegment')
791                return 
792
793            top = topdl.Topology(
794                    **resp['segmentdescription']['topdldescription'])
795
796            for e in top.elements:
797                if isinstance(e, topdl.Computer):
798                    self.node[e.name] = e
799                elif isinstance(e, topdl.Testbed):
800                    self.tb[e.uri] = e
801            for s in top.substrates:
802                self.subs[s.name] = s
803
804        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
805            req = {
806                    'allocID': { 'fedid' : aid }, 
807                    'segmentdescription': { 
808                        'topdldescription': topo.to_dict(),
809                    },
810                }
811
812            if connInfo:
813                req['connection'] = connInfo
814
815            import_svcs = [ s for m in masters.values() \
816                    for s in m if self.testbed in s.importers]
817
818            if import_svcs or self.testbed in masters:
819                req['service'] = []
820
821            for s in import_svcs:
822                for r in s.reqs:
823                    sr = copy.deepcopy(r)
824                    sr['visibility'] = 'import';
825                    req['service'].append(sr)
826
827            for s in masters.get(self.testbed, []):
828                for r in s.reqs:
829                    sr = copy.deepcopy(r)
830                    sr['visibility'] = 'export';
831                    req['service'].append(sr)
832
833            if attrs:
834                req['fedAttr'] = attrs
835
836            try:
837                self.log.debug("Calling StartSegment at %s " % uri)
838                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
839                        self.trusted_certs)
840                if r.has_key('StartSegmentResponseBody'):
841                    lval = r['StartSegmentResponseBody'].get('allocationLog',
842                            None)
843                    if lval and self.log_collector:
844                        for line in  lval.splitlines(True):
845                            self.log_collector.write(line)
846                    self.make_map(r['StartSegmentResponseBody'])
847                    if 'proof' in r: self.proof = r['proof']
848                    self.response = r
849                else:
850                    raise service_error(service_error.internal, 
851                            "Bad response!?: %s" %r)
852                return True
853            except service_error, e:
854                self.log.error("Start segment failed on %s: %s" % \
855                        (self.testbed, e))
856                return False
857
858
859
860    class terminate_segment:
861        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
862                cert_pwd=None, trusted_certs=None, caller=None):
863            self.log = log
864            self.debug = debug
865            self.cert_file = cert_file
866            self.cert_pwd = cert_pwd
867            self.trusted_certs = None
868            self.caller = caller
869            self.testbed = testbed
870
871        def __call__(self, uri, aid ):
872            req = {
873                    'allocID': {'fedid': aid }, 
874                }
875            self.log.info("Calling terminate segment")
876            try:
877                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
878                        self.trusted_certs)
879                self.log.info("Terminate segment succeeded")
880                return True
881            except service_error, e:
882                self.log.error("Terminate segment failed on %s: %s" % \
883                        (self.testbed, e))
884                return False
885
886    class info_segment(start_segment):
887        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
888                cert_pwd=None, trusted_certs=None, caller=None,
889                log_collector=None):
890            experiment_control_local.start_segment.__init__(self, debug, 
891                    log, testbed, cert_file, cert_pwd, trusted_certs, 
892                    caller, log_collector)
893
894        def __call__(self, uri, aid):
895            req = { 'allocID': { 'fedid' : aid } }
896
897            try:
898                self.log.debug("Calling InfoSegment at %s " % uri)
899                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
900                        self.trusted_certs)
901                if r.has_key('InfoSegmentResponseBody'):
902                    self.make_map(r['InfoSegmentResponseBody'])
903                    if 'proof' in r: self.proof = r['proof']
904                    self.response = r
905                else:
906                    raise service_error(service_error.internal, 
907                            "Bad response!?: %s" %r)
908                return True
909            except service_error, e:
910                self.log.error("Info segment failed on %s: %s" % \
911                        (self.testbed, e))
912                return False
913
914    class operation_segment:
915        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
916                cert_pwd=None, trusted_certs=None, caller=None,
917                log_collector=None):
918            self.log = log
919            self.debug = debug
920            self.cert_file = cert_file
921            self.cert_pwd = cert_pwd
922            self.trusted_certs = None
923            self.caller = caller
924            self.testbed = testbed
925            self.status = None
926
927        def __call__(self, uri, aid, op, targets, params):
928            req = { 
929                    'allocID': { 'fedid' : aid },
930                    'operation': op,
931                    'target': targets,
932                    }
933            if params: req['parameter'] = params
934
935
936            try:
937                self.log.debug("Calling OperationSegment at %s " % uri)
938                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
939                        self.trusted_certs)
940                if 'OperationSegmentResponseBody' in r:
941                    r = r['OperationSegmentResponseBody']
942                    if 'status' in r:
943                        self.status = r['status']
944                else:
945                    raise service_error(service_error.internal, 
946                            "Bad response!?: %s" %r)
947                return True
948            except service_error, e:
949                self.log.error("Operation segment failed on %s: %s" % \
950                        (self.testbed, e))
951                return False
952
953    def annotate_topology(self, top, data):
954        # These routines do various parts of the annotation
955        def add_new_names(nl, l):
956            """ add any names in nl to the list in l """
957            for n in nl:
958                if n not in l: l.append(n)
959       
960        def merge_services(ne, e):
961            for ns in ne.service:
962                # NB: the else is on the for
963                for s in e.service:
964                    if ns.name == s.name:
965                        s.importer = ns.importer
966                        s.param = ns.param
967                        s.description = ns.description
968                        s.status = ns.status
969                        break
970                else:
971                    e.service.append(ns)
972       
973        def merge_oses(ne, e):
974            """
975            Merge the operating system entries of ne into e
976            """
977            for nos in ne.os:
978                # NB: the else is on the for
979                for os in e.os:
980                    if nos.name == os.name:
981                        os.version = nos.version
982                        os.version = nos.distribution
983                        os.version = nos.distributionversion
984                        for a in nos.attribute:
985                            if os.get_attribute(a.attribute):
986                                os.remove_attribute(a.attribute)
987                            os.set_attribute(a.attribute, a.value)
988                        break
989                else:
990                    # If both nodes have one OS, this is a replacement
991                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
992                    else: e.os.append(nos)
993
994        # Annotate the topology with embedding info
995        for e in top.elements:
996            if isinstance(e, topdl.Computer):
997                for s in data:
998                    ne = s.node.get(e.name, None)
999                    if ne is not None:
1000                        add_new_names(ne.localname, e.localname)
1001                        e.status = ne.status
1002                        merge_services(ne, e)
1003                        add_new_names(ne.operation, e.operation)
1004                        if ne.os: merge_oses(ne, e)
1005                        break
1006            elif isinstance(e,topdl.Testbed):
1007                for s in data:
1008                    ne = s.tb.get(e.uri, None)
1009                    if ne is not None:
1010                        add_new_names(ne.localname, e.localname)
1011                        add_new_names(ne.operation, e.operation)
1012                        merge_services(ne, e)
1013                        for a in ne.attribute:
1014                            e.set_attribute(a.attribute, a.value)
1015        # Annotate substrates
1016        for s in top.substrates:
1017            for d in data:
1018                ss = d.subs.get(s.name, None)
1019                if ss is not None:
1020                    if ss.capacity is not None:
1021                        s.capacity = ss.capacity
1022                    if s.latency is not None:
1023                        s.latency = ss.latency
1024
1025
1026
1027    def allocate_resources(self, allocated, masters, eid, expid, 
1028            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1029            attrs=None, connInfo={}, tbmap=None, expcert=None):
1030
1031        started = { }           # Testbeds where a sub-experiment started
1032                                # successfully
1033
1034        # XXX
1035        fail_soft = False
1036
1037        if tbmap is None: tbmap = { }
1038
1039        log = alloc_log or self.log
1040
1041        tp = thread_pool(self.nthreads)
1042        threads = [ ]
1043        starters = [ ]
1044
1045        if expcert:
1046            cert = expcert
1047            pw = None
1048        else:
1049            cert = self.cert_file
1050            pw = self.cert_pwd
1051
1052        for tb in allocated.keys():
1053            # Create and start a thread to start the segment, and save it
1054            # to get the return value later
1055            tb_attrs = copy.copy(attrs)
1056            tp.wait_for_slot()
1057            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1058            base, suffix = split_testbed(tb)
1059            if suffix:
1060                tb_attrs.append({'attribute': 'experiment_name', 
1061                    'value': "%s-%s" % (eid, suffix)})
1062            else:
1063                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1064            if not uri:
1065                raise service_error(service_error.internal, 
1066                        "Unknown testbed %s !?" % tb)
1067
1068            aid = tbparams[tb].allocID
1069            if not aid:
1070                raise service_error(service_error.internal, 
1071                        "No alloc id for testbed %s !?" % tb)
1072
1073            s = self.start_segment(log=log, debug=self.debug,
1074                    testbed=tb, cert_file=cert,
1075                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1076                    caller=self.call_StartSegment,
1077                    log_collector=log_collector)
1078            starters.append(s)
1079            t  = pooled_thread(\
1080                    target=s, name=tb,
1081                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1082                    pdata=tp, trace_file=self.trace_file)
1083            threads.append(t)
1084            t.start()
1085
1086        # Wait until all finish (keep pinging the log, though)
1087        mins = 0
1088        revoked = False
1089        while not tp.wait_for_all_done(60.0):
1090            mins += 1
1091            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1092                    % mins)
1093            if not revoked and \
1094                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1095                # a testbed has failed.  Revoke this experiment's
1096                # synchronizarion values so that sub experiments will not
1097                # deadlock waiting for synchronization that will never happen
1098                self.log.info("A subexperiment has failed to swap in, " + \
1099                        "revoking synch keys")
1100                var_key = "fedid:%s" % expid
1101                for k in self.synch_store.all_keys():
1102                    if len(k) > 45 and k[0:46] == var_key:
1103                        self.synch_store.revoke_key(k)
1104                revoked = True
1105
1106        failed = [ t.getName() for t in threads if not t.rv ]
1107        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1108
1109        # If one failed clean up, unless fail_soft is set
1110        if failed:
1111            if not fail_soft:
1112                tp.clear()
1113                for tb in succeeded:
1114                    # Create and start a thread to stop the segment
1115                    tp.wait_for_slot()
1116                    uri = tbparams[tb].uri
1117                    t  = pooled_thread(\
1118                            target=self.terminate_segment(log=log,
1119                                testbed=tb,
1120                                cert_file=cert, 
1121                                cert_pwd=pw,
1122                                trusted_certs=self.trusted_certs,
1123                                caller=self.call_TerminateSegment),
1124                            args=(uri, tbparams[tb].allocID),
1125                            name=tb,
1126                            pdata=tp, trace_file=self.trace_file)
1127                    t.start()
1128                # Wait until all finish (if any are being stopped)
1129                if succeeded:
1130                    tp.wait_for_all_done()
1131
1132                # release the allocations
1133                for tb in tbparams.keys():
1134                    try:
1135                        self.release_access(tb, tbparams[tb].allocID, 
1136                                tbmap=tbmap, uri=tbparams[tb].uri,
1137                                cert_file=cert, cert_pwd=pw)
1138                    except service_error, e:
1139                        self.log.warn("Error releasing access: %s" % e.desc)
1140                # Remove the placeholder
1141                self.state_lock.acquire()
1142                self.state[eid].status = 'failed'
1143                self.state[eid].updated()
1144                if self.state_filename: self.write_state()
1145                self.state_lock.release()
1146                # Remove the repo dir
1147                self.remove_dirs("%s/%s" %(self.repodir, expid))
1148                # Walk up tmpdir, deleting as we go
1149                if self.cleanup:
1150                    self.remove_dirs(tmpdir)
1151                else:
1152                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1153
1154
1155                log.error("Swap in failed on %s" % ",".join(failed))
1156                return
1157        else:
1158            # Walk through the successes and gather the proofs
1159            proofs = { }
1160            for s in starters:
1161                if s.proof:
1162                    proofs[s.testbed] = s.proof
1163            self.annotate_topology(top, starters)
1164            log.info("[start_segment]: Experiment %s active" % eid)
1165
1166
1167        # Walk up tmpdir, deleting as we go
1168        if self.cleanup:
1169            self.remove_dirs(tmpdir)
1170        else:
1171            log.debug("[start_experiment]: not removing %s" % tmpdir)
1172
1173        # Insert the experiment into our state and update the disk copy.
1174        self.state_lock.acquire()
1175        self.state[expid].status = 'active'
1176        self.state[eid] = self.state[expid]
1177        self.state[eid].top = top
1178        self.state[eid].updated()
1179        # Append startup proofs
1180        for f in self.state[eid].get_all_allocations():
1181            if f.tb in proofs:
1182                f.proof.append(proofs[f.tb])
1183
1184        if self.state_filename: self.write_state()
1185        self.state_lock.release()
1186        return
1187
1188
1189    def add_kit(self, e, kit):
1190        """
1191        Add a Software object created from the list of (install, location)
1192        tuples passed as kit  to the software attribute of an object e.  We
1193        do this enough to break out the code, but it's kind of a hack to
1194        avoid changing the old tuple rep.
1195        """
1196
1197        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1198
1199        if isinstance(e.software, list): e.software.extend(s)
1200        else: e.software = s
1201
1202    def append_experiment_authorization(self, expid, attrs, 
1203            need_state_lock=True):
1204        """
1205        Append the authorization information to system state
1206        """
1207
1208        for p, a in attrs:
1209            if p and a:
1210                # Improperly configured overrides can add bad principals.
1211                self.auth.set_attribute(p, a)
1212            else:
1213                self.log.debug("Bad append experiment_authorization %s %s" \
1214                        % (p, a))
1215        self.auth.save()
1216
1217        if need_state_lock: self.state_lock.acquire()
1218        # XXX: really a no op?
1219        #self.state[expid]['auth'].update(attrs)
1220        if self.state_filename: self.write_state()
1221        if need_state_lock: self.state_lock.release()
1222
1223    def clear_experiment_authorization(self, expid, need_state_lock=True):
1224        """
1225        Attrs is a set of attribute principal pairs that need to be removed
1226        from the authenticator.  Remove them and save the authenticator.
1227        """
1228
1229        if need_state_lock: self.state_lock.acquire()
1230        # XXX: should be a no-op
1231        #if expid in self.state and 'auth' in self.state[expid]:
1232            #for p, a in self.state[expid]['auth']:
1233                #self.auth.unset_attribute(p, a)
1234            #self.state[expid]['auth'] = set()
1235        if self.state_filename: self.write_state()
1236        if need_state_lock: self.state_lock.release()
1237        self.auth.save()
1238
1239
1240    def create_experiment_state(self, fid, req, expid, expcert,
1241            state='starting'):
1242        """
1243        Create the initial entry in the experiment's state.  The expid and
1244        expcert are the experiment's fedid and certifacte that represents that
1245        ID, which are installed in the experiment state.  If the request
1246        includes a suggested local name that is used if possible.  If the local
1247        name is already taken by an experiment owned by this user that has
1248        failed, it is overwritten.  Otherwise new letters are added until a
1249        valid localname is found.  The generated local name is returned.
1250        """
1251
1252        if req.has_key('experimentID') and \
1253                req['experimentID'].has_key('localname'):
1254            overwrite = False
1255            eid = req['experimentID']['localname']
1256            # If there's an old failed experiment here with the same local name
1257            # and accessible by this user, we'll overwrite it, otherwise we'll
1258            # fall through and do the collision avoidance.
1259            old_expid = self.get_experiment_fedid(eid)
1260            if old_expid:
1261                users_experiment = True
1262                try:
1263                    self.check_experiment_access(fid, old_expid)
1264                except service_error, e:
1265                    if e.code == service_error.access: users_experiment = False
1266                    else: raise e
1267                if users_experiment:
1268                    self.state_lock.acquire()
1269                    status = self.state[eid].status
1270                    if status and status == 'failed':
1271                        # remove the old access attributes
1272                        self.clear_experiment_authorization(eid,
1273                                need_state_lock=False)
1274                        overwrite = True
1275                        del self.state[eid]
1276                        del self.state[old_expid]
1277                    self.state_lock.release()
1278                else:
1279                    self.log.info('Experiment %s exists, ' % eid + \
1280                            'but this user cannot access it')
1281            self.state_lock.acquire()
1282            while (self.state.has_key(eid) and not overwrite):
1283                eid += random.choice(string.ascii_letters)
1284            # Initial state
1285            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1286                    identity=expcert)
1287            self.state[expid] = self.state[eid]
1288            if self.state_filename: self.write_state()
1289            self.state_lock.release()
1290        else:
1291            eid = self.exp_stem
1292            for i in range(0,5):
1293                eid += random.choice(string.ascii_letters)
1294            self.state_lock.acquire()
1295            while (self.state.has_key(eid)):
1296                eid = self.exp_stem
1297                for i in range(0,5):
1298                    eid += random.choice(string.ascii_letters)
1299            # Initial state
1300            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1301                    identity=expcert)
1302            self.state[expid] = self.state[eid]
1303            if self.state_filename: self.write_state()
1304            self.state_lock.release()
1305
1306        # Let users touch the state.  Authorize this fid and the expid itself
1307        # to touch the experiment, as well as allowing th eoverrides.
1308        self.append_experiment_authorization(eid, 
1309                set([(fid, expid), (expid,expid)] + \
1310                        [ (o, expid) for o in self.overrides]))
1311
1312        return eid
1313
1314
1315    def allocate_ips_to_topo(self, top):
1316        """
1317        Add an ip4_address attribute to all the hosts in the topology that have
1318        not already been assigned one by the user, based on the shared
1319        substrates on which they sit.  An /etc/hosts file is also created and
1320        returned as a list of hostfiles entries.  We also return the allocator,
1321        because we may need to allocate IPs to portals
1322        (specifically DRAGON portals).
1323        """
1324        subs = sorted(top.substrates, 
1325                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1326                reverse=True)
1327        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1328        ifs = { }
1329        hosts = [ ]
1330
1331        assigned = { }
1332        assigned_subs = set()
1333        # Look for IP networks that were already assigned to the experiment by
1334        # the user.  We assume that users are smart in making that assignment,
1335        # but we check them later.
1336        for e in top.elements:
1337            if not isinstance(e, topdl.Computer): continue
1338            for inf in e.interface:
1339                a = inf.get_attribute('ip4_address')
1340                if not a: continue
1341                for s in inf.substrate:
1342                    assigned_subs.add(s)
1343                a = ip_addr(a)
1344                n = ip_addr(inf.get_attribute('ip4_netmask'))
1345
1346                sz = 0x100000000 - long(n)
1347                net = (long(a) & long(n)) & 0xffffffff
1348                if net in assigned:
1349                    if sz > assigned[net]: assigned[net] = sz
1350                assigned[net] = sz
1351
1352        # XXX: hack attack - always avoid 10.0.23.0 which is special on DETER
1353        assigned[long(ip_addr('10.0.23.0'))] = 256
1354        # end of hack
1355
1356        # Walk all substrates, smallest to largest, assigning IP addresses to
1357        # the unassigned and checking the assigned.  Build an /etc/hosts file
1358        # as well.
1359        for idx, s in enumerate(subs):
1360            if s.name not in assigned_subs:
1361                net_size = len(s.interfaces)+2
1362
1363                # Get an ip allocation for this substrate.  Make sure the
1364                # allocation does not collide with any user allocations.
1365                ok = False
1366                while not ok:
1367                    a = ips.allocate(net_size)
1368                    if not a:
1369                        raise service_error(service_error.req, 
1370                                "Cannot allocate IP addresses")
1371                    base, num = a
1372                    if num < net_size: 
1373                        raise service_error(service_error.internal,
1374                                "Allocator returned wrong number of IPs??")
1375                    # Check for overlap.  An assigned network could contain an
1376                    # endpoint of the new allocation or the new allocation
1377                    # could contain an endpoint of an assigned network.
1378                    # NB the else is attached to the for - if the loop is not
1379                    # exited by a break, ok = True.
1380                    for n, sz in assigned.items():
1381                        if base >= n and base < n + sz:
1382                            ok = False
1383                            break
1384                        if base + num > n and base + num  < n + sz:
1385                            ok = False
1386                            break
1387                        if n >= base and n < base + num:
1388                            ok = False
1389                            break
1390                        if n+sz > base and n+sz < base + num:
1391                            ok = False
1392                            break
1393                    else:
1394                        ok = True
1395                           
1396                mask = ips.min_alloc
1397                while mask < net_size:
1398                    mask *= 2
1399
1400                netmask = ((2**32-1) ^ (mask-1))
1401            else:
1402                base = 0
1403
1404            # Add the attributes and build hosts
1405            base += 1
1406            for i in s.interfaces:
1407                # Add address and masks to interfaces on unassigned substrates
1408                if s.name not in assigned_subs:
1409                    i.attribute.append(
1410                            topdl.Attribute('ip4_address', 
1411                                "%s" % ip_addr(base)))
1412                    i.attribute.append(
1413                            topdl.Attribute('ip4_netmask', 
1414                                "%s" % ip_addr(int(netmask))))
1415
1416                # Make /etc/hosts entries
1417                addr = i.get_attribute('ip4_address')
1418                if addr is None:
1419                    raise service_error(service_error.req, 
1420                            "Partially assigned substrate %s" %s.name)
1421                hname = i.element.name
1422                if hname in ifs:
1423                    hosts.append("%s\t%s-%s %s-%d" % \
1424                            (addr, hname, s.name, hname, ifs[hname]))
1425                else:
1426                    # First IP allocated it the default ip according to hosts,
1427                    # so the extra alias is here.
1428                    ifs[hname] = 0
1429                    hosts.append("%s\t%s-%s %s-%d %s" % \
1430                            (addr, hname, s.name, hname, ifs[hname], hname))
1431
1432                ifs[hname] += 1
1433                base += 1
1434        return hosts, ips
1435
1436    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1437            tbparam, masters, tbmap, expid=None, expcert=None):
1438        for tb in testbeds:
1439            self.log.debug('Trying testbed %s' % tb)
1440            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1441                    expcert)
1442            allocated[tb] = 1
1443            self.log.debug('Got testbed %s' % tb)
1444
1445    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1446            expcert=None):
1447        """
1448        Get access to testbed through fedd and set the parameters for that tb
1449        """
1450        def get_export_project(svcs):
1451            """
1452            Look through for the list of federated_service for this testbed
1453            objects for a project_export service, and extract the project
1454            parameter.
1455            """
1456
1457            pe = [s for s in svcs if s.name=='project_export']
1458            if len(pe) == 1:
1459                return pe[0].params.get('project', None)
1460            elif len(pe) == 0:
1461                return None
1462            else:
1463                raise service_error(service_error.req,
1464                        "More than one project export is not supported")
1465
1466        def add_services(svcs, type, slist, keys):
1467            """
1468            Add the given services to slist.  type is import or export.  Also
1469            add a mapping entry from the assigned id to the original service
1470            record.
1471            """
1472            for i, s in enumerate(svcs):
1473                idx = '%s%d' % (type, i)
1474                keys[idx] = s
1475                sr = {'id': idx, 'name': s.name, 'visibility': type }
1476                if s.params:
1477                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1478                            for k, v in s.params.items()]
1479                slist.append(sr)
1480
1481        uri = tbmap.get(testbed_base(tb), None)
1482        if not uri:
1483            raise service_error(service_error.server_config, 
1484                    "Unknown testbed: %s" % tb)
1485
1486        export_svcs = masters.get(tb,[])
1487        import_svcs = [ s for m in masters.values() \
1488                for s in m \
1489                    if tb in s.importers ]
1490
1491        export_project = get_export_project(export_svcs)
1492        # Compose the credential list so that IDs come before attributes
1493        creds = set()
1494        keys = set()
1495        certs = self.auth.get_creds_for_principal(fid)
1496        # Append credenials about this experiment controller - e.g. that it is
1497        # trusted.
1498        certs.update(self.auth.get_creds_for_principal(
1499            fedid(file=self.cert_file)))
1500        if expid:
1501            certs.update(self.auth.get_creds_for_principal(expid))
1502        for c in certs:
1503            keys.add(c.issuer_cert())
1504            creds.add(c.attribute_cert())
1505        creds = list(keys) + list(creds)
1506
1507        if expcert: cert, pw = expcert, None
1508        else: cert, pw = self.cert_file, self.cert_pw
1509
1510        # Request credentials
1511        req = {
1512                'abac_credential': creds,
1513            }
1514        # Make the service request from the services we're importing and
1515        # exporting.  Keep track of the export request ids so we can
1516        # collect the resulting info from the access response.
1517        e_keys = { }
1518        if import_svcs or export_svcs:
1519            slist = []
1520            add_services(import_svcs, 'import', slist, e_keys)
1521            add_services(export_svcs, 'export', slist, e_keys)
1522            req['service'] = slist
1523
1524        if self.local_access.has_key(uri):
1525            # Local access call
1526            req = { 'RequestAccessRequestBody' : req }
1527            r = self.local_access[uri].RequestAccess(req, 
1528                    fedid(file=self.cert_file))
1529            r = { 'RequestAccessResponseBody' : r }
1530        else:
1531            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1532
1533        if r.has_key('RequestAccessResponseBody'):
1534            # Through to here we have a valid response, not a fault.
1535            # Access denied is a fault, so something better or worse than
1536            # access denied has happened.
1537            r = r['RequestAccessResponseBody']
1538            self.log.debug("[get_access] Access granted")
1539        else:
1540            raise service_error(service_error.protocol,
1541                        "Bad proxy response")
1542        if 'proof' not in r:
1543            raise service_error(service_error.protocol,
1544                        "Bad access response (no access proof)")
1545
1546        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1547                tb=tb, uri=uri, proof=[r['proof']], 
1548                services=masters.get(tb, None))
1549
1550        # Collect the responses corresponding to the services this testbed
1551        # exports.  These will be the service requests that we will include in
1552        # the start segment requests (with appropriate visibility values) to
1553        # import and export the segments.
1554        for s in r.get('service', []):
1555            id = s.get('id', None)
1556            # Note that this attaches the response to the object in the masters
1557            # data structure.  (The e_keys index disappears when this fcn
1558            # returns)
1559            if id and id in e_keys:
1560                e_keys[id].reqs.append(s)
1561
1562        # Add attributes to parameter space.  We don't allow attributes to
1563        # overlay any parameters already installed.
1564        for a in r.get('fedAttr', []):
1565            try:
1566                if a['attribute']:
1567                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1568            except KeyError:
1569                self.log.error("Bad attribute in response: %s" % a)
1570
1571
1572    def split_topology(self, top, topo, testbeds):
1573        """
1574        Create the sub-topologies that are needed for experiment instantiation.
1575        """
1576        for tb in testbeds:
1577            topo[tb] = top.clone()
1578            # copy in for loop allows deletions from the original
1579            for e in [ e for e in topo[tb].elements]:
1580                etb = e.get_attribute('testbed')
1581                # NB: elements without a testbed attribute won't appear in any
1582                # sub topologies. 
1583                if not etb or etb != tb:
1584                    for i in e.interface:
1585                        for s in i.subs:
1586                            try:
1587                                s.interfaces.remove(i)
1588                            except ValueError:
1589                                raise service_error(service_error.internal,
1590                                        "Can't remove interface??")
1591                    topo[tb].elements.remove(e)
1592            topo[tb].make_indices()
1593
1594    def confirm_software(self, top):
1595        """
1596        Make sure that the software to be loaded in the topo is all available
1597        before we begin making access requests, etc.  This is a subset of
1598        wrangle_software.
1599        """
1600        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1601        pkgs.update([x.location for e in top.elements for x in e.software])
1602
1603        for pkg in pkgs:
1604            loc = pkg
1605
1606            scheme, host, path = urlparse(loc)[0:3]
1607            dest = os.path.basename(path)
1608            if not scheme:
1609                if not loc.startswith('/'):
1610                    loc = "/%s" % loc
1611                loc = "file://%s" %loc
1612            # NB: if scheme was found, loc == pkg
1613            try:
1614                u = urlopen(loc)
1615                u.close()
1616            except Exception, e:
1617                raise service_error(service_error.req, 
1618                        "Cannot open %s: %s" % (loc, e))
1619        return True
1620
1621    def wrangle_software(self, expid, top, topo, tbparams):
1622        """
1623        Copy software out to the repository directory, allocate permissions and
1624        rewrite the segment topologies to look for the software in local
1625        places.
1626        """
1627
1628        # Copy the rpms and tarfiles to a distribution directory from
1629        # which the federants can retrieve them
1630        linkpath = "%s/software" %  expid
1631        softdir ="%s/%s" % ( self.repodir, linkpath)
1632        softmap = { }
1633
1634        # self.fedkit and self.gateway kit are lists of tuples of
1635        # (install_location, download_location) this extracts the download
1636        # locations.
1637        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1638        pkgs.update([x.location for e in top.elements for x in e.software])
1639        try:
1640            os.makedirs(softdir)
1641        except EnvironmentError, e:
1642            raise service_error(
1643                    "Cannot create software directory: %s" % e)
1644        # The actual copying.  Everything's converted into a url for copying.
1645        auth_attrs = set()
1646        for pkg in pkgs:
1647            loc = pkg
1648
1649            scheme, host, path = urlparse(loc)[0:3]
1650            dest = os.path.basename(path)
1651            if not scheme:
1652                if not loc.startswith('/'):
1653                    loc = "/%s" % loc
1654                loc = "file://%s" %loc
1655            # NB: if scheme was found, loc == pkg
1656            try:
1657                u = urlopen(loc)
1658            except Exception, e:
1659                raise service_error(service_error.req, 
1660                        "Cannot open %s: %s" % (loc, e))
1661            try:
1662                f = open("%s/%s" % (softdir, dest) , "w")
1663                self.log.debug("Writing %s/%s" % (softdir,dest) )
1664                data = u.read(4096)
1665                while data:
1666                    f.write(data)
1667                    data = u.read(4096)
1668                f.close()
1669                u.close()
1670            except Exception, e:
1671                raise service_error(service_error.internal,
1672                        "Could not copy %s: %s" % (loc, e))
1673            path = re.sub("/tmp", "", linkpath)
1674            # XXX
1675            softmap[pkg] = \
1676                    "%s/%s/%s" %\
1677                    ( self.repo_url, path, dest)
1678
1679            # Allow the individual segments to access the software by assigning
1680            # an attribute to each testbed allocation that encodes the data to
1681            # be released.  This expression collects the data for each run of
1682            # the loop.
1683            auth_attrs.update([
1684                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1685                        for tb in tbparams.keys()])
1686
1687        self.append_experiment_authorization(expid, auth_attrs)
1688
1689        # Convert the software locations in the segments into the local
1690        # copies on this host
1691        for soft in [ s for tb in topo.values() \
1692                for e in tb.elements \
1693                    if getattr(e, 'software', False) \
1694                        for s in e.software ]:
1695            if softmap.has_key(soft.location):
1696                soft.location = softmap[soft.location]
1697
1698
1699    def new_experiment(self, req, fid):
1700        """
1701        The external interface to empty initial experiment creation called from
1702        the dispatcher.
1703
1704        Creates a working directory, splits the incoming description using the
1705        splitter script and parses out the avrious subsections using the
1706        lcasses above.  Once each sub-experiment is created, use pooled threads
1707        to instantiate them and start it all up.
1708        """
1709        self.log.info("New experiment call started for %s" % fid)
1710        req = req.get('NewRequestBody', None)
1711        if not req:
1712            raise service_error(service_error.req,
1713                    "Bad request format (no NewRequestBody)")
1714
1715        # import may partially succeed so always save credentials and warn
1716        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1717            self.log.debug("Failed to import delegation credentials(!)")
1718        self.get_grouper_updates(fid)
1719        self.auth.update()
1720        self.auth.save()
1721
1722        try:
1723            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1724                    with_proof=True)
1725        except service_error, e:
1726            self.log.info("New experiment call for %s: access denied" % fid)
1727            raise e
1728
1729
1730        if not access_ok:
1731            self.log.info("New experiment call for %s: Access denied" % fid)
1732            raise service_error(service_error.access, "New access denied",
1733                    proof=[proof])
1734
1735        try:
1736            tmpdir = tempfile.mkdtemp(prefix="split-")
1737        except EnvironmentError:
1738            raise service_error(service_error.internal, "Cannot create tmp dir")
1739
1740        # Generate an ID for the experiment (slice) and a certificate that the
1741        # allocator can use to prove they own it.  We'll ship it back through
1742        # the encrypted connection.  If the requester supplied one, use it.
1743        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1744            expcert = req['experimentAccess']['X509']
1745            expid = fedid(certstr=expcert)
1746            self.state_lock.acquire()
1747            if expid in self.state:
1748                self.state_lock.release()
1749                raise service_error(service_error.req, 
1750                        'fedid %s identifies an existing experiment' % expid)
1751            self.state_lock.release()
1752        else:
1753            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1754
1755        #now we're done with the tmpdir, and it should be empty
1756        if self.cleanup:
1757            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1758            os.rmdir(tmpdir)
1759        else:
1760            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1761
1762        eid = self.create_experiment_state(fid, req, expid, expcert, 
1763                state='empty')
1764
1765        rv = {
1766                'experimentID': [
1767                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1768                ],
1769                'experimentStatus': 'empty',
1770                'experimentAccess': { 'X509' : expcert },
1771                'proof': proof.to_dict(),
1772            }
1773
1774        self.log.info("New experiment call succeeded for %s" % fid)
1775        return rv
1776
1777    # create_experiment sub-functions
1778
1779    @staticmethod
1780    def get_experiment_key(req, field='experimentID'):
1781        """
1782        Parse the experiment identifiers out of the request (the request body
1783        tag has been removed).  Specifically this pulls either the fedid or the
1784        localname out of the experimentID field.  A fedid is preferred.  If
1785        neither is present or the request does not contain the fields,
1786        service_errors are raised.
1787        """
1788        # Get the experiment access
1789        exp = req.get(field, None)
1790        if exp:
1791            if exp.has_key('fedid'):
1792                key = exp['fedid']
1793            elif exp.has_key('localname'):
1794                key = exp['localname']
1795            else:
1796                raise service_error(service_error.req, "Unknown lookup type")
1797        else:
1798            raise service_error(service_error.req, "No request?")
1799
1800        return key
1801
1802    def get_experiment_ids_and_start(self, key, tmpdir):
1803        """
1804        Get the experiment name, id and access certificate from the state, and
1805        set the experiment state to 'starting'.  returns a triple (fedid,
1806        localname, access_cert_file). The access_cert_file is a copy of the
1807        contents of the access certificate, created in the tempdir with
1808        restricted permissions.  If things are confused, raise an exception.
1809        """
1810
1811        expid = eid = None
1812        self.state_lock.acquire()
1813        if key in self.state:
1814            exp = self.state[key]
1815            exp.status = "starting"
1816            exp.updated()
1817            expid = exp.fedid
1818            eid = exp.localname
1819            expcert = exp.identity
1820        self.state_lock.release()
1821
1822        # make a protected copy of the access certificate so the experiment
1823        # controller can act as the experiment principal.
1824        if expcert:
1825            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1826            if not expcert_file:
1827                raise service_error(service_error.internal, 
1828                        "Cannot create temp cert file?")
1829        else:
1830            expcert_file = None
1831
1832        return (eid, expid, expcert_file)
1833
1834    def get_topology(self, req, tmpdir):
1835        """
1836        Get the ns2 content and put it into a file for parsing.  Call the local
1837        or remote parser and return the topdl.Topology.  Errors result in
1838        exceptions.  req is the request and tmpdir is a work directory.
1839        """
1840
1841        # The tcl parser needs to read a file so put the content into that file
1842        descr=req.get('experimentdescription', None)
1843        if descr:
1844            if 'ns2description' in descr:
1845                file_content=descr['ns2description']
1846            elif 'topdldescription' in descr:
1847                return topdl.Topology(**descr['topdldescription'])
1848            else:
1849                raise service_error(service_error.req, 
1850                        'Unknown experiment description type')
1851        else:
1852            raise service_error(service_error.req, "No experiment description")
1853
1854
1855        if self.splitter_url:
1856            self.log.debug("Calling remote topdl translator at %s" % \
1857                    self.splitter_url)
1858            top = self.remote_ns2topdl(self.splitter_url, file_content)
1859        else:
1860            tclfile = os.path.join(tmpdir, "experiment.tcl")
1861            if file_content:
1862                try:
1863                    f = open(tclfile, 'w')
1864                    f.write(file_content)
1865                    f.close()
1866                except EnvironmentError:
1867                    raise service_error(service_error.internal,
1868                            "Cannot write temp experiment description")
1869            else:
1870                raise service_error(service_error.req, 
1871                        "Only ns2descriptions supported")
1872            pid = "dummy"
1873            gid = "dummy"
1874            eid = "dummy"
1875
1876            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1877                str(self.muxmax), '-m', 'dummy']
1878
1879            tclcmd.extend([pid, gid, eid, tclfile])
1880
1881            self.log.debug("running local splitter %s", " ".join(tclcmd))
1882            # This is just fantastic.  As a side effect the parser copies
1883            # tb_compat.tcl into the current directory, so that directory
1884            # must be writable by the fedd user.  Doing this in the
1885            # temporary subdir ensures this is the case.
1886            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1887                    cwd=tmpdir)
1888            split_data = tclparser.stdout
1889
1890            top = topdl.topology_from_xml(file=split_data, top="experiment")
1891            os.remove(tclfile)
1892
1893        return top
1894
1895    def get_testbed_services(self, req, testbeds):
1896        """
1897        Parse the services section of the request into two dicts mapping
1898        testbed to lists of federated_service objects.  The first dict maps all
1899        exporters of services to those service objects, the second maps
1900        testbeds to service objects only for services requiring portals.
1901        """
1902
1903        # Sanity check the services.  Exports or imports from unknown testbeds
1904        # cause an exception.
1905        for s in req.get('service', []):
1906            for t in s.get('import', []):
1907                if t not in testbeds:
1908                    raise service_error(service_error.req, 
1909                            'Service import to unknown testbed: %s' %t)
1910            for t in s.get('export', []):
1911                if t not in testbeds:
1912                    raise service_error(service_error.req, 
1913                            'Service export from unknown testbed: %s' %t)
1914
1915
1916        # We construct both dicts here because deriving the second is more
1917        # complex than it looks - both the keys and lists can differ, and it's
1918        # much easier to generate both in one pass.
1919        masters = { }
1920        pmasters = { }
1921        for s in req.get('service', []):
1922            # If this is a service request with the importall field
1923            # set, fill it out.
1924
1925            if s.get('importall', False):
1926                s['import'] = [ tb for tb in testbeds \
1927                        if tb not in s.get('export',[])]
1928                del s['importall']
1929
1930            # Add the service to masters
1931            for tb in s.get('export', []):
1932                if s.get('name', None):
1933
1934                    params = { }
1935                    for a in s.get('fedAttr', []):
1936                        params[a.get('attribute', '')] = a.get('value','')
1937
1938                    fser = federated_service(name=s['name'],
1939                            exporter=tb, importers=s.get('import',[]),
1940                            params=params)
1941                    if fser.name == 'hide_hosts' \
1942                            and 'hosts' not in fser.params:
1943                        fser.params['hosts'] = \
1944                                ",".join(tb_hosts.get(fser.exporter, []))
1945                    if tb in masters: masters[tb].append(fser)
1946                    else: masters[tb] = [fser]
1947
1948                    if fser.portal:
1949                        if tb in pmasters: pmasters[tb].append(fser)
1950                        else: pmasters[tb] = [fser]
1951                else:
1952                    self.log.error('Testbed service does not have name " + \
1953                            "and importers')
1954        return masters, pmasters
1955
1956    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1957        """
1958        Create the ssh keys necessary for interconnecting the portal nodes and
1959        the global hosts file for letting each segment know about the IP
1960        addresses in play.  If we have computed static routes, copy them into
1961        the repo. Save these into the repo.  Add attributes to the autorizer
1962        allowing access controllers to download them and return a set of
1963        attributes that inform the segments where to find this stuff.  May
1964        raise service_errors in if there are problems.
1965        """
1966        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1967        gw_secretkey_base = "fed.%s" % self.ssh_type
1968        keydir = os.path.join(tmpdir, 'keys')
1969        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1970        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1971        route = os.path.join(tmpdir, 'route.tgz')
1972
1973        try:
1974            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1975        except ValueError:
1976            raise service_error(service_error.server_config, 
1977                    "Bad key type (%s)" % self.ssh_type)
1978
1979        self.generate_seer_certs(keydir)
1980
1981        # Copy configuration files into the remote file store
1982        # The config urlpath
1983        configpath = "/%s/config" % expid
1984        # The config file system location
1985        configdir ="%s%s" % ( self.repodir, configpath)
1986        route_conf = os.path.join(configdir, 'route.tgz')
1987        try:
1988            os.makedirs(configdir)
1989        except EnvironmentError, e:
1990            raise service_error(service_error.internal,
1991                    "Cannot create config directory: %s" % e)
1992        try:
1993            f = open("%s/hosts" % configdir, "w")
1994            print >> f, string.join(hosts, '\n')
1995            f.close()
1996        except EnvironmentError, e:
1997            raise service_error(service_error.internal, 
1998                    "Cannot write hosts file: %s" % e)
1999        try:
2000            if os.path.exists(route):
2001                copy_file(route, route_conf)
2002
2003            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
2004            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
2005            copy_file(os.path.join(keydir, 'ca.pem'), 
2006                    os.path.join(configdir, 'ca.pem'))
2007            copy_file(os.path.join(keydir, 'node.pem'), 
2008                    os.path.join(configdir, 'node.pem'))
2009        except EnvironmentError, e:
2010            raise service_error(service_error.internal, 
2011                    "Cannot copy keyfiles: %s" % e)
2012
2013        # Allow the individual testbeds to access the configuration files,
2014        # again by setting an attribute for the relevant pathnames on each
2015        # allocation principal.  Yeah, that's a long list comprehension.
2016        self.append_experiment_authorization(expid, set([
2017            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
2018                    for tb in tbparams.keys() \
2019                        for f in ("hosts", 'ca.pem', 'node.pem', 'route.tgz',
2020                            gw_secretkey_base, gw_pubkey_base)]))
2021
2022        attrs = [ 
2023                {
2024                    'attribute': 'ssh_pubkey', 
2025                    'value': '%s/%s/config/%s' % \
2026                            (self.repo_url, expid, gw_pubkey_base)
2027                },
2028                {
2029                    'attribute': 'ssh_secretkey', 
2030                    'value': '%s/%s/config/%s' % \
2031                            (self.repo_url, expid, gw_secretkey_base)
2032                },
2033                {
2034                    'attribute': 'hosts', 
2035                    'value': '%s/%s/config/hosts' % \
2036                            (self.repo_url, expid)
2037                },
2038                {
2039                    'attribute': 'seer_ca_pem', 
2040                    'value': '%s/%s/config/%s' % \
2041                            (self.repo_url, expid, 'ca.pem')
2042                },
2043                {
2044                    'attribute': 'seer_node_pem', 
2045                    'value': '%s/%s/config/%s' % \
2046                            (self.repo_url, expid, 'node.pem')
2047                },
2048            ]
2049        # Add info about static routes if we have some
2050        if os.path.exists(route_conf):
2051            attrs.append(
2052                {
2053                    'attribute': 'route.tgz', 
2054                    'value': '%s/%s/config/%s' % \
2055                            (self.repo_url, expid, 'route.tgz')
2056                }
2057            )
2058        return attrs
2059
2060
2061    def get_vtopo(self, req, fid):
2062        """
2063        Return the stored virtual topology for this experiment
2064        """
2065        rv = None
2066        state = None
2067        self.log.info("vtopo call started for %s" %  fid)
2068
2069        req = req.get('VtopoRequestBody', None)
2070        if not req:
2071            raise service_error(service_error.req,
2072                    "Bad request format (no VtopoRequestBody)")
2073        exp = req.get('experiment', None)
2074        if exp:
2075            if exp.has_key('fedid'):
2076                key = exp['fedid']
2077                keytype = "fedid"
2078            elif exp.has_key('localname'):
2079                key = exp['localname']
2080                keytype = "localname"
2081            else:
2082                raise service_error(service_error.req, "Unknown lookup type")
2083        else:
2084            raise service_error(service_error.req, "No request?")
2085
2086        try:
2087            proof = self.check_experiment_access(fid, key)
2088        except service_error, e:
2089            self.log.info("vtopo call failed for %s: access denied" %  fid)
2090            raise e
2091
2092        self.state_lock.acquire()
2093        # XXX: this needs to be recalculated
2094        if key in self.state:
2095            if self.state[key].top is not None:
2096                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2097                rv = { 'experiment' : {keytype: key },
2098                        'vtopo': vtopo,
2099                        'proof': proof.to_dict(), 
2100                    }
2101            else:
2102                state = self.state[key].status
2103        self.state_lock.release()
2104
2105        if rv: 
2106            self.log.info("vtopo call completed for %s %s " % \
2107                (key, fid))
2108            return rv
2109        else: 
2110            if state:
2111                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2112                    (key, fid))
2113                raise service_error(service_error.partial, 
2114                        "Not ready: %s" % state)
2115            else:
2116                self.log.info("vtopo call completed for %s %s (No experiment)"\
2117                        % (key, fid))
2118                raise service_error(service_error.req, "No such experiment")
2119
2120    def get_vis(self, req, fid):
2121        """
2122        Return the stored visualization for this experiment
2123        """
2124        rv = None
2125        state = None
2126
2127        self.log.info("vis call started for %s" %  fid)
2128        req = req.get('VisRequestBody', None)
2129        if not req:
2130            raise service_error(service_error.req,
2131                    "Bad request format (no VisRequestBody)")
2132        exp = req.get('experiment', None)
2133        if exp:
2134            if exp.has_key('fedid'):
2135                key = exp['fedid']
2136                keytype = "fedid"
2137            elif exp.has_key('localname'):
2138                key = exp['localname']
2139                keytype = "localname"
2140            else:
2141                raise service_error(service_error.req, "Unknown lookup type")
2142        else:
2143            raise service_error(service_error.req, "No request?")
2144
2145        try:
2146            proof = self.check_experiment_access(fid, key)
2147        except service_error, e:
2148            self.log.info("vis call failed for %s: access denied" %  fid)
2149            raise e
2150
2151        self.state_lock.acquire()
2152        # Generate the visualization
2153        if key in self.state:
2154            if self.state[key].top is not None:
2155                try:
2156                    vis = self.genviz(
2157                            topdl.topology_to_vtopo(self.state[key].top))
2158                except service_error, e:
2159                    self.state_lock.release()
2160                    raise e
2161                rv =  { 'experiment' : {keytype: key },
2162                        'vis': vis,
2163                        'proof': proof.to_dict(), 
2164                        }
2165            else:
2166                state = self.state[key].status
2167        self.state_lock.release()
2168
2169        if rv: 
2170            self.log.info("vis call completed for %s %s " % \
2171                (key, fid))
2172            return rv
2173        else:
2174            if state:
2175                self.log.info("vis call completed for %s %s (not ready)" % \
2176                    (key, fid))
2177                raise service_error(service_error.partial, 
2178                        "Not ready: %s" % state)
2179            else:
2180                self.log.info("vis call completed for %s %s (no experiment)" % \
2181                    (key, fid))
2182                raise service_error(service_error.req, "No such experiment")
2183
2184    @staticmethod
2185    def needs_route_computation(req):
2186        '''
2187        Walk the request services looking for a static routing request
2188        '''
2189        for s in req.get('service', []):
2190            if s.get('name', '') == 'static_routing':
2191                return True
2192        return False
2193
2194    def compute_static_routes(self, tmpdir, top):
2195        '''
2196        Compute a set of static routes for the topology.  The result is a file
2197        called route.tgz in tmpdir that contains a dinrectory, route, with a
2198        file per node in the topology that lists the prefix and router for all
2199        the routes in that node's routing table.  Exceptions from the route
2200        calculation program and the tar creation call are not caught.
2201        '''
2202        if self.routing is None:
2203            raise service_error(service_error.server, 
2204                    'Cannot provide staticroutes, no routing program specified')
2205        rg = tempfile.NamedTemporaryFile()
2206        outdir = os.path.join(tmpdir, 'route')
2207        tarfile = os.path.join(tmpdir, 'route.tgz')
2208        topology_to_route_file(top, file=rg)
2209        os.mkdir(outdir)
2210        try:
2211            for cmd in (
2212                    [self.routing, '--input', rg.name,'--output', outdir],
2213                    ['tar', '-C', tmpdir, '-czf', tarfile, 'route']):
2214                subprocess.check_call(cmd)
2215        except subprocess.CalledProcessError, e:
2216            raise service_error(service_error.internal, 
2217                    'Cannot call %s: %s' % (' '.join(cmd), e.returncode))
2218        rg.close()
2219        shutil.rmtree(outdir)
2220
2221
2222   
2223    def save_federant_information(self, allocated, tbparams, eid, top):
2224        """
2225        Store the various data that have changed in the experiment state
2226        between when it was started and the beginning of resource allocation.
2227        This is basically the information about each local allocation.  This
2228        fills in the values of the placeholder allocation in the state.  It
2229        also collects the access proofs and returns them as dicts for a
2230        response message.
2231        """
2232        self.state_lock.acquire()
2233        exp = self.state[eid]
2234        exp.top = top.clone()
2235        # save federant information
2236        for k in allocated.keys():
2237            exp.add_allocation(tbparams[k])
2238            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2239                type="testbed", localname=[k], 
2240                service=[ s.to_topdl() for s in tbparams[k].services]))
2241
2242        # Access proofs for the response message
2243        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2244                    for p in tbparams[k].proof]
2245        exp.updated()
2246        if self.state_filename: 
2247            self.write_state()
2248        self.state_lock.release()
2249        return proofs
2250
2251    def clear_placeholder(self, eid, expid, tmpdir):
2252        """
2253        Clear the placeholder and remove any allocated temporary dir.
2254        """
2255
2256        self.state_lock.acquire()
2257        del self.state[eid]
2258        del self.state[expid]
2259        if self.state_filename: self.write_state()
2260        self.state_lock.release()
2261        if tmpdir and self.cleanup:
2262            self.remove_dirs(tmpdir)
2263
2264    # end of create_experiment sub-functions
2265
2266    def create_experiment(self, req, fid):
2267        """
2268        The external interface to experiment creation called from the
2269        dispatcher.
2270
2271        Creates a working directory, splits the incoming description using the
2272        splitter script and parses out the various subsections using the
2273        classes above.  Once each sub-experiment is created, use pooled threads
2274        to instantiate them and start it all up.
2275        """
2276
2277        self.log.info("Create experiment call started for %s" % fid)
2278        req = req.get('CreateRequestBody', None)
2279        if req:
2280            key = self.get_experiment_key(req)
2281        else:
2282            raise service_error(service_error.req,
2283                    "Bad request format (no CreateRequestBody)")
2284
2285        # Import information from the requester
2286        # import may partially succeed so always save credentials and warn
2287        if not self.auth.import_credentials(data_list=req.get('credential',[])):
2288            self.log.debug("Failed to import delegation credentials(!)")
2289        self.get_grouper_updates(fid)
2290        self.auth.update()
2291        self.auth.save()
2292
2293        try:
2294            # Make sure that the caller can talk to us
2295            proof = self.check_experiment_access(fid, key)
2296        except service_error, e:
2297            self.log.info("Create experiment call failed for %s: access denied"\
2298                    % fid)
2299            raise e
2300
2301
2302        # Install the testbed map entries supplied with the request into a copy
2303        # of the testbed map.
2304        tbmap = dict(self.tbmap)
2305        for m in req.get('testbedmap', []):
2306            if 'testbed' in m and 'uri' in m:
2307                tbmap[m['testbed']] = m['uri']
2308
2309        # a place to work
2310        try:
2311            tmpdir = tempfile.mkdtemp(prefix="split-")
2312            os.mkdir(tmpdir+"/keys")
2313        except EnvironmentError:
2314            raise service_error(service_error.internal, "Cannot create tmp dir")
2315
2316        tbparams = { }
2317
2318        eid, expid, expcert_file = \
2319                self.get_experiment_ids_and_start(key, tmpdir)
2320
2321        # This catches exceptions to clear the placeholder if necessary
2322        try: 
2323            if not (eid and expid):
2324                raise service_error(service_error.internal, 
2325                        "Cannot find local experiment info!?")
2326
2327            top = self.get_topology(req, tmpdir)
2328            self.confirm_software(top)
2329            # Assign the IPs
2330            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2331            if self.needs_route_computation(req):
2332                self.compute_static_routes(tmpdir, top)
2333            # Find the testbeds to look up
2334            tb_hosts = { }
2335            testbeds = [ ]
2336            for e in top.elements:
2337                if isinstance(e, topdl.Computer):
2338                    tb = e.get_attribute('testbed')
2339                    # Put nodes not in a testbed into the default testbed if
2340                    # there is one.
2341                    if tb is None:
2342                        if self.default_tb is None:
2343                            raise service_error(service_error.req, 
2344                                '%s not in a testbed (and no default)' % e.name)
2345                        tb = self.default_tb
2346                        e.set_attribute('testbed', tb)
2347                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2348                    else: 
2349                        tb_hosts[tb] = [ e.name ]
2350                        testbeds.append(tb)
2351
2352            masters, pmasters = self.get_testbed_services(req, testbeds)
2353            allocated = { }         # Testbeds we can access
2354            topo ={ }               # Sub topologies
2355            connInfo = { }          # Connection information
2356
2357            self.get_access_to_testbeds(testbeds, fid, allocated, 
2358                    tbparams, masters, tbmap, expid, expcert_file)
2359
2360            # tbactive is the set of testbeds that have NATs in front of their
2361            # portals. They need to initiate connections.
2362            tbactive = set([k for k, v in tbparams.items() \
2363                    if v.get_attribute('nat_portals')])
2364
2365            self.split_topology(top, topo, testbeds)
2366
2367            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2368
2369            part = experiment_partition(self.auth, self.store_url, tbmap,
2370                    self.muxmax, self.direct_transit, tbactive)
2371            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2372                    connInfo, expid)
2373
2374            auth_attrs = set()
2375            # Now get access to the dynamic testbeds (those added above)
2376            for tb in [ t for t in topo if t not in allocated]:
2377                self.get_access(tb, tbparams, fid, masters, tbmap, 
2378                        expid, expcert_file)
2379                allocated[tb] = 1
2380                store_keys = topo[tb].get_attribute('store_keys')
2381                # Give the testbed access to keys it exports or imports
2382                if store_keys:
2383                    auth_attrs.update(set([
2384                        (tbparams[tb].allocID, sk) \
2385                                for sk in store_keys.split(" ")]))
2386
2387            if auth_attrs:
2388                self.append_experiment_authorization(expid, auth_attrs)
2389
2390            # transit and disconnected testbeds may not have a connInfo entry.
2391            # Fill in the blanks.
2392            for t in allocated.keys():
2393                if not connInfo.has_key(t):
2394                    connInfo[t] = { }
2395
2396            self.wrangle_software(expid, top, topo, tbparams)
2397
2398            proofs = self.save_federant_information(allocated, tbparams, 
2399                    eid, top)
2400        except service_error, e:
2401            # If something goes wrong in the parse (usually an access error)
2402            # clear the placeholder state.  From here on out the code delays
2403            # exceptions.  Failing at this point returns a fault to the remote
2404            # caller.
2405
2406            self.log.info("Create experiment call failed for %s %s: %s" % 
2407                    (eid, fid, e))
2408            self.clear_placeholder(eid, expid, tmpdir)
2409            raise e
2410
2411        # Start the background swapper and return the starting state.  From
2412        # here on out, the state will stick around a while.
2413
2414        # Create a logger that logs to the experiment's state object as well as
2415        # to the main log file.
2416        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2417        alloc_collector = self.list_log(self.state[eid].log)
2418        h = logging.StreamHandler(alloc_collector)
2419        # XXX: there should be a global one of these rather than repeating the
2420        # code.
2421        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2422                    '%d %b %y %H:%M:%S'))
2423        alloc_log.addHandler(h)
2424
2425        # Start a thread to do the resource allocation
2426        t  = Thread(target=self.allocate_resources,
2427                args=(allocated, masters, eid, expid, tbparams, 
2428                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2429                    connInfo, tbmap, expcert_file),
2430                name=eid)
2431        t.start()
2432
2433        rv = {
2434                'experimentID': [
2435                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2436                ],
2437                'experimentStatus': 'starting',
2438                'proof': [ proof.to_dict() ] + proofs,
2439            }
2440        self.log.info("Create experiment call succeeded for %s %s" % \
2441                (eid, fid))
2442
2443        return rv
2444   
2445    def get_experiment_fedid(self, key):
2446        """
2447        find the fedid associated with the localname key in the state database.
2448        """
2449
2450        rv = None
2451        self.state_lock.acquire()
2452        if key in self.state:
2453            rv = self.state[key].fedid
2454        self.state_lock.release()
2455        return rv
2456
2457    def check_experiment_access(self, fid, key):
2458        """
2459        Confirm that the fid has access to the experiment.  Though a request
2460        may be made in terms of a local name, the access attribute is always
2461        the experiment's fedid.
2462        """
2463        if not isinstance(key, fedid):
2464            key = self.get_experiment_fedid(key)
2465
2466        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2467
2468        if access_ok:
2469            return proof
2470        else:
2471            raise service_error(service_error.access, "Access Denied",
2472                proof)
2473
2474
2475    def get_handler(self, path, fid):
2476        """
2477        Perhaps surprisingly named, this function handles HTTP GET requests to
2478        this server (SOAP requests are POSTs).
2479        """
2480        self.log.info("Get handler %s %s" % (path, fid))
2481        if len("%s" % fid) == 0:
2482            return (None, None)
2483        # XXX: log proofs?
2484        if self.auth.check_attribute(fid, path):
2485            return ("%s/%s" % (self.repodir, path), "application/binary")
2486        else:
2487            return (None, None)
2488
2489    def update_info(self, key, force=False):
2490        top = None
2491        self.state_lock.acquire()
2492        if key in self.state:
2493            if force or self.state[key].older_than(self.info_cache_limit):
2494                top = self.state[key].top
2495                if top is not None: top = top.clone()
2496                d1, info_params, cert, d2 = \
2497                        self.get_segment_info(self.state[key], need_lock=False)
2498        self.state_lock.release()
2499
2500        if top is None: return
2501
2502        try:
2503            tmpdir = tempfile.mkdtemp(prefix="info-")
2504        except EnvironmentError:
2505            raise service_error(service_error.internal, 
2506                    "Cannot create tmp dir")
2507        cert_file = self.make_temp_certfile(cert, tmpdir)
2508
2509        data = []
2510        try:
2511            for k, (uri, aid) in info_params.items():
2512                info=self.info_segment(log=self.log, testbed=uri,
2513                            cert_file=cert_file, cert_pwd=None,
2514                            trusted_certs=self.trusted_certs,
2515                            caller=self.call_InfoSegment)
2516                info(uri, aid)
2517                data.append(info)
2518        # Clean up the tmpdir no matter what
2519        finally:
2520            if tmpdir: self.remove_dirs(tmpdir)
2521
2522        self.annotate_topology(top, data)
2523        self.state_lock.acquire()
2524        if key in self.state:
2525            self.state[key].top = top
2526            self.state[key].updated()
2527            if self.state_filename: self.write_state()
2528        self.state_lock.release()
2529
2530   
2531    def get_info(self, req, fid):
2532        """
2533        Return all the stored info about this experiment
2534        """
2535        rv = None
2536
2537        self.log.info("Info call started for %s" %  fid)
2538        req = req.get('InfoRequestBody', None)
2539        if not req:
2540            raise service_error(service_error.req,
2541                    "Bad request format (no InfoRequestBody)")
2542        exp = req.get('experiment', None)
2543        legacy = req.get('legacy', False)
2544        fresh = req.get('fresh', False)
2545        if exp:
2546            if exp.has_key('fedid'):
2547                key = exp['fedid']
2548                keytype = "fedid"
2549            elif exp.has_key('localname'):
2550                key = exp['localname']
2551                keytype = "localname"
2552            else:
2553                raise service_error(service_error.req, "Unknown lookup type")
2554        else:
2555            raise service_error(service_error.req, "No request?")
2556
2557        try:
2558            proof = self.check_experiment_access(fid, key)
2559        except service_error, e:
2560            self.log.info("Info call failed for %s: access denied" %  fid)
2561
2562
2563        self.update_info(key, fresh)
2564
2565        self.state_lock.acquire()
2566        if self.state.has_key(key):
2567            rv = self.state[key].get_info()
2568            # Copy the topo if we need legacy annotations
2569            if legacy:
2570                top = self.state[key].top
2571                if top is not None: top = top.clone()
2572        self.state_lock.release()
2573        self.log.info("Gathered Info for %s %s" % (key, fid))
2574
2575        # If the legacy visualization and topology representations are
2576        # requested, calculate them and add them to the return.
2577        if legacy and rv is not None:
2578            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2579            if top is not None:
2580                vtopo = topdl.topology_to_vtopo(top)
2581                if vtopo is not None:
2582                    rv['vtopo'] = vtopo
2583                    try:
2584                        vis = self.genviz(vtopo)
2585                    except service_error, e:
2586                        self.log.debug('Problem generating visualization: %s' \
2587                                % e)
2588                        vis = None
2589                    if vis is not None:
2590                        rv['vis'] = vis
2591        if rv:
2592            self.log.info("Info succeded for %s %s" % (key, fid))
2593            rv['proof'] = proof.to_dict()
2594            return rv
2595        else: 
2596            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2597            raise service_error(service_error.req, "No such experiment")
2598
2599    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2600            results):
2601        """
2602        Call OperateSegment on multiple testbeds and gather the results.
2603        op_params contains the parameters needed to contact that testbed, cert
2604        is a certificate containing the fedid to use, op is the operation,
2605        testbeds is a dict mapping testbed name to targets in that testbed,
2606        params are the parameters to include a,d results is a growing list of
2607        the results of the calls.
2608        """
2609        try:
2610            tmpdir = tempfile.mkdtemp(prefix="info-")
2611        except EnvironmentError:
2612            raise service_error(service_error.internal, 
2613                    "Cannot create tmp dir")
2614        cert_file = self.make_temp_certfile(cert, tmpdir)
2615
2616        try:
2617            for tb, targets in testbeds.items():
2618                if tb in op_params:
2619                    uri, aid = op_params[tb]
2620                    operate=self.operation_segment(log=self.log, testbed=uri,
2621                                cert_file=cert_file, cert_pwd=None,
2622                                trusted_certs=self.trusted_certs,
2623                                caller=self.call_OperationSegment)
2624                    if operate(uri, aid, op, targets, params):
2625                        if operate.status is not None:
2626                            results.extend(operate.status)
2627                            continue
2628                # Something went wrong in a weird way.  Add statuses
2629                # that reflect that to results
2630                for t in targets:
2631                    results.append(operation_status(t, 
2632                        operation_status.federant,
2633                        'Unexpected error on %s' % tb))
2634        # Clean up the tmpdir no matter what
2635        finally:
2636            if tmpdir: self.remove_dirs(tmpdir)
2637
2638    def do_operation(self, req, fid):
2639        """
2640        Find the testbeds holding each target and ask them to carry out the
2641        operation.  Return the statuses.
2642        """
2643        # Map an element to the testbed containing it
2644        def element_to_tb(e):
2645            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2646            elif isinstance(e, topdl.Testbed): return e.name
2647            else: return None
2648        # If d is an operation_status object, make it a dict
2649        def make_dict(d):
2650            if isinstance(d, dict): return d
2651            elif isinstance(d, operation_status): return d.to_dict()
2652            else: return { }
2653
2654        def element_name(e):
2655            if isinstance(e, topdl.Computer): return e.name
2656            elif isinstance(e, topdl.Testbed): 
2657                if e.localname: return e.localname[0]
2658                else: return None
2659            else: return None
2660
2661        self.log.info("Operation call started for %s" %  fid)
2662        req = req.get('OperationRequestBody', None)
2663        if not req:
2664            raise service_error(service_error.req,
2665                    "Bad request format (no OperationRequestBody)")
2666        exp = req.get('experiment', None)
2667        op = req.get('operation', None)
2668        targets = set(req.get('target', []))
2669        params = req.get('parameter', None)
2670
2671        if exp:
2672            if 'fedid' in exp:
2673                key = exp['fedid']
2674                keytype = "fedid"
2675            elif 'localname' in exp:
2676                key = exp['localname']
2677                keytype = "localname"
2678            else:
2679                raise service_error(service_error.req, "Unknown lookup type")
2680        else:
2681            raise service_error(service_error.req, "No request?")
2682
2683        if op is None or not targets:
2684            raise service_error(service_error.req, "No request?")
2685
2686        try:
2687            proof = self.check_experiment_access(fid, key)
2688        except service_error, e:
2689            self.log.info("Operation call failed for %s: access denied" %  fid)
2690            raise e
2691
2692        self.state_lock.acquire()
2693        if key in self.state:
2694            d1, op_params, cert, d2 = \
2695                    self.get_segment_info(self.state[key], need_lock=False,
2696                            key='tb')
2697            top = self.state[key].top
2698            if top is not None:
2699                top = top.clone()
2700        self.state_lock.release()
2701
2702        if top is None:
2703            self.log.info("Operation call failed for %s: not active" %  fid)
2704            raise service_error(service_error.partial, "No topology yet", 
2705                    proof=proof)
2706
2707        testbeds = { }
2708        results = []
2709        for e in top.elements:
2710            ename = element_name(e)
2711            if ename in targets:
2712                tb = element_to_tb(e)
2713                targets.remove(ename)
2714                if tb is not None:
2715                    if tb in testbeds: testbeds[tb].append(ename)
2716                    else: testbeds[tb] = [ ename ]
2717                else:
2718                    results.append(operation_status(e.name, 
2719                        code=operation_status.no_target, 
2720                        description='Cannot map target to testbed'))
2721
2722        for t in targets:
2723            results.append(operation_status(t, operation_status.no_target))
2724
2725        self.operate_on_segments(op_params, cert, op, testbeds, params,
2726                results)
2727
2728        self.log.info("Operation call succeeded for %s" %  fid)
2729        return { 
2730                'experiment': exp, 
2731                'status': [make_dict(r) for r in results],
2732                'proof': proof.to_dict()
2733                }
2734
2735
2736    def get_multi_info(self, req, fid):
2737        """
2738        Return all the stored info that this fedid can access
2739        """
2740        rv = { 'info': [ ], 'proof': [ ] }
2741
2742        self.log.info("Multi Info call started for %s" %  fid)
2743        self.get_grouper_updates(fid)
2744        self.auth.update()
2745        self.auth.save()
2746        self.state_lock.acquire()
2747        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2748            try:
2749                proof = self.check_experiment_access(fid, key)
2750            except service_error, e:
2751                if e.code == service_error.access:
2752                    continue
2753                else:
2754                    self.log.info("Multi Info call failed for %s: %s" %  \
2755                            (e,fid))
2756                    self.state_lock.release()
2757                    raise e
2758
2759            if self.state.has_key(key):
2760                e = self.state[key].get_info()
2761                e['proof'] = proof.to_dict()
2762                rv['info'].append(e)
2763                rv['proof'].append(proof.to_dict())
2764        self.state_lock.release()
2765        self.log.info("Multi Info call succeeded for %s" %  fid)
2766        return rv
2767
2768    def check_termination_status(self, fed_exp, force):
2769        """
2770        Confirm that the experiment is sin a valid state to stop (or force it)
2771        return the state - invalid states for deletion and force settings cause
2772        exceptions.
2773        """
2774        self.state_lock.acquire()
2775        status = fed_exp.status
2776
2777        if status:
2778            if status in ('starting', 'terminating'):
2779                if not force:
2780                    self.state_lock.release()
2781                    raise service_error(service_error.partial, 
2782                            'Experiment still being created or destroyed')
2783                else:
2784                    self.log.warning('Experiment in %s state ' % status + \
2785                            'being terminated by force.')
2786            self.state_lock.release()
2787            return status
2788        else:
2789            # No status??? trouble
2790            self.state_lock.release()
2791            raise service_error(service_error.internal,
2792                    "Experiment has no status!?")
2793
2794    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2795        ids = []
2796        term_params = { }
2797        if need_lock: self.state_lock.acquire()
2798        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2799        expcert = fed_exp.identity
2800        repo = "%s" % fed_exp.fedid
2801
2802        # Collect the allocation/segment ids into a dict keyed by the fedid
2803        # of the allocation that contains a tuple of uri, aid
2804        for i, fed in enumerate(fed_exp.get_all_allocations()):
2805            uri = fed.uri
2806            aid = fed.allocID
2807            if key == 'aid': term_params[aid] = (uri, aid)
2808            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2809
2810        if need_lock: self.state_lock.release()
2811        return ids, term_params, expcert, repo
2812
2813
2814    def get_termination_info(self, fed_exp):
2815        self.state_lock.acquire()
2816        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2817        # Change the experiment state
2818        fed_exp.status = 'terminating'
2819        fed_exp.updated()
2820        if self.state_filename: self.write_state()
2821        self.state_lock.release()
2822
2823        return ids, term_params, expcert, repo
2824
2825
2826    def deallocate_resources(self, term_params, expcert, status, force, 
2827            dealloc_log):
2828        tmpdir = None
2829        # This try block makes sure the tempdir is cleared
2830        try:
2831            # If no expcert, try the deallocation as the experiment
2832            # controller instance.
2833            if expcert and self.auth_type != 'legacy': 
2834                try:
2835                    tmpdir = tempfile.mkdtemp(prefix="term-")
2836                except EnvironmentError:
2837                    raise service_error(service_error.internal, 
2838                            "Cannot create tmp dir")
2839                cert_file = self.make_temp_certfile(expcert, tmpdir)
2840                pw = None
2841            else: 
2842                cert_file = self.cert_file
2843                pw = self.cert_pwd
2844
2845            # Stop everyone.  NB, wait_for_all waits until a thread starts
2846            # and then completes, so we can't wait if nothing starts.  So,
2847            # no tbparams, no start.
2848            if len(term_params) > 0:
2849                tp = thread_pool(self.nthreads)
2850                for k, (uri, aid) in term_params.items():
2851                    # Create and start a thread to stop the segment
2852                    tp.wait_for_slot()
2853                    t  = pooled_thread(\
2854                            target=self.terminate_segment(log=dealloc_log,
2855                                testbed=uri,
2856                                cert_file=cert_file, 
2857                                cert_pwd=pw,
2858                                trusted_certs=self.trusted_certs,
2859                                caller=self.call_TerminateSegment),
2860                            args=(uri, aid), name=k,
2861                            pdata=tp, trace_file=self.trace_file)
2862                    t.start()
2863                # Wait for completions
2864                tp.wait_for_all_done()
2865
2866            # release the allocations (failed experiments have done this
2867            # already, and starting experiments may be in odd states, so we
2868            # ignore errors releasing those allocations
2869            try: 
2870                for k, (uri, aid)  in term_params.items():
2871                    self.release_access(None, aid, uri=uri,
2872                            cert_file=cert_file, cert_pwd=pw)
2873            except service_error, e:
2874                if status != 'failed' and not force:
2875                    raise e
2876
2877        # Clean up the tmpdir no matter what
2878        finally:
2879            if tmpdir: self.remove_dirs(tmpdir)
2880
2881    def terminate_experiment(self, req, fid):
2882        """
2883        Swap this experiment out on the federants and delete the shared
2884        information
2885        """
2886        self.log.info("Terminate experiment call started for %s" % fid)
2887        tbparams = { }
2888        req = req.get('TerminateRequestBody', None)
2889        if not req:
2890            raise service_error(service_error.req,
2891                    "Bad request format (no TerminateRequestBody)")
2892
2893        key = self.get_experiment_key(req, 'experiment')
2894        try:
2895            proof = self.check_experiment_access(fid, key)
2896        except service_error, e:
2897            self.log.info(
2898                    "Terminate experiment call failed for %s: access denied" \
2899                            % fid)
2900            raise e
2901        exp = req.get('experiment', False)
2902        force = req.get('force', False)
2903
2904        dealloc_list = [ ]
2905
2906
2907        # Create a logger that logs to the dealloc_list as well as to the main
2908        # log file.
2909        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2910        dealloc_log.info("Terminating %s " %key)
2911        h = logging.StreamHandler(self.list_log(dealloc_list))
2912        # XXX: there should be a global one of these rather than repeating the
2913        # code.
2914        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2915                    '%d %b %y %H:%M:%S'))
2916        dealloc_log.addHandler(h)
2917
2918        self.state_lock.acquire()
2919        fed_exp = self.state.get(key, None)
2920        self.state_lock.release()
2921        repo = None
2922
2923        if fed_exp:
2924            status = self.check_termination_status(fed_exp, force)
2925            # get_termination_info updates the experiment state
2926            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2927            self.deallocate_resources(term_params, expcert, status, force, 
2928                    dealloc_log)
2929
2930            # Remove the terminated experiment
2931            self.state_lock.acquire()
2932            for id in ids:
2933                self.clear_experiment_authorization(id, need_state_lock=False)
2934                if id in self.state: del self.state[id]
2935
2936            if self.state_filename: self.write_state()
2937            self.state_lock.release()
2938
2939            # Delete any synch points associated with this experiment.  All
2940            # synch points begin with the fedid of the experiment.
2941            fedid_keys = set(["fedid:%s" % f for f in ids \
2942                    if isinstance(f, fedid)])
2943            for k in self.synch_store.all_keys():
2944                try:
2945                    if len(k) > 45 and k[0:46] in fedid_keys:
2946                        self.synch_store.del_value(k)
2947                except synch_store.BadDeletionError:
2948                    pass
2949            self.write_store()
2950
2951            # Remove software and other cached stuff from the filesystem.
2952            if repo:
2953                self.remove_dirs("%s/%s" % (self.repodir, repo))
2954       
2955            self.log.info("Terminate experiment succeeded for %s %s" % \
2956                    (key, fid))
2957            return { 
2958                    'experiment': exp , 
2959                    'deallocationLog': string.join(dealloc_list, ''),
2960                    'proof': [proof.to_dict()],
2961                    }
2962        else:
2963            self.log.info("Terminate experiment failed for %s %s: no state" % \
2964                    (key, fid))
2965            raise service_error(service_error.req, "No saved state")
2966
2967
2968    def GetValue(self, req, fid):
2969        """
2970        Get a value from the synchronized store
2971        """
2972        req = req.get('GetValueRequestBody', None)
2973        if not req:
2974            raise service_error(service_error.req,
2975                    "Bad request format (no GetValueRequestBody)")
2976       
2977        name = req.get('name', None)
2978        wait = req.get('wait', False)
2979        rv = { 'name': name }
2980
2981        if not name:
2982            raise service_error(service_error.req, "No name?")
2983
2984        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2985
2986        if access_ok:
2987            self.log.debug("[GetValue] asking for %s " % name)
2988            try:
2989                v = self.synch_store.get_value(name, wait)
2990            except synch_store.RevokedKeyError:
2991                # No more synch on this key
2992                raise service_error(service_error.federant, 
2993                        "Synch key %s revoked" % name)
2994            if v is not None:
2995                rv['value'] = v
2996            rv['proof'] = proof.to_dict()
2997            self.log.debug("[GetValue] got %s from %s" % (v, name))
2998            return rv
2999        else:
3000            raise service_error(service_error.access, "Access Denied",
3001                    proof=proof)
3002       
3003
3004    def SetValue(self, req, fid):
3005        """
3006        Set a value in the synchronized store
3007        """
3008        req = req.get('SetValueRequestBody', None)
3009        if not req:
3010            raise service_error(service_error.req,
3011                    "Bad request format (no SetValueRequestBody)")
3012       
3013        name = req.get('name', None)
3014        v = req.get('value', '')
3015
3016        if not name:
3017            raise service_error(service_error.req, "No name?")
3018
3019        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
3020
3021        if access_ok:
3022            try:
3023                self.synch_store.set_value(name, v)
3024                self.write_store()
3025                self.log.debug("[SetValue] set %s to %s" % (name, v))
3026            except synch_store.CollisionError:
3027                # Translate into a service_error
3028                raise service_error(service_error.req,
3029                        "Value already set: %s" %name)
3030            except synch_store.RevokedKeyError:
3031                # No more synch on this key
3032                raise service_error(service_error.federant, 
3033                        "Synch key %s revoked" % name)
3034                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
3035        else:
3036            raise service_error(service_error.access, "Access Denied",
3037                    proof=proof)
Note: See TracBrowser for help on using the repository browser.